@@ -33,6 +33,7 @@ import com.comcast.ip4s.Host
3333import com .comcast .ip4s .IpAddress
3434import com .comcast .ip4s .Port
3535import com .comcast .ip4s .SocketAddress
36+ import fs2 .internal .jsdeps .node .eventsMod
3637import fs2 .internal .jsdeps .node .netMod
3738import fs2 .internal .jsdeps .node .nodeStrings
3839import fs2 .internal .jsdeps .std
@@ -81,40 +82,38 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
8182 ): Resource [F , (SocketAddress [IpAddress ], Stream [F , Socket [F ]])] =
8283 (for {
8384 dispatcher <- Dispatcher [F ]
84- queue <- Queue .unbounded[F , netMod.Socket ].toResource
85- error <- F .deferred[Throwable ].toResource
85+ queue <- Queue .unbounded[F , Option [netMod.Socket ]].toResource
8686 server <- Resource .make(
8787 F
8888 .delay(
8989 netMod.createServer(
9090 netMod.ServerOpts ().setPauseOnConnect(true ).setAllowHalfOpen(true ),
91- sock => dispatcher.unsafeRunAndForget(queue.offer(sock))
91+ sock => dispatcher.unsafeRunAndForget(queue.offer(Some ( sock) ))
9292 )
9393 )
9494 )(server =>
95- F .async_ [Unit ] { cb =>
95+ F .async [Unit ] { cb =>
9696 if (server.listening)
97- server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException )))
97+ F .delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException )))) >> queue
98+ .offer(None )
99+ .as(None )
98100 else
99- cb(Right (()))
101+ F .delay( cb(Right (()))).as( None )
100102 }
101103 )
102- _ <- registerListener[std.Error ](server, nodeStrings.error)(_.once_error(_, _)) { e =>
103- dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException (e)))
104- }
105- _ <- error.get
106- .race(
107- F
108- .async_[Unit ] { cb =>
109- server.listen(
110- address.foldLeft(
111- netMod.ListenOptions ().setPort(port.fold(0.0 )(_.value.toDouble))
112- )((opts, host) => opts.setHost(host.toString)),
113- () => cb(Right (()))
114- )
104+ _ <- F
105+ .async_[Unit ] { cb =>
106+ server.once_error(nodeStrings.error, e => cb(Left (js.JavaScriptException (e))))
107+ server.listen(
108+ address.foldLeft(
109+ netMod.ListenOptions ().setPort(port.fold(0.0 )(_.value.toDouble))
110+ )((opts, host) => opts.setHost(host.toString)),
111+ () => {
112+ server.asInstanceOf [eventsMod.EventEmitter ].removeAllListeners(" error" )
113+ cb(Right (()))
115114 }
116- )
117- .rethrow
115+ )
116+ }
118117 .toResource
119118 ipAddress <- F
120119 .delay(server.address())
@@ -124,10 +123,9 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
124123 }
125124 .toResource
126125 sockets = Stream
127- .fromQueueUnterminated (queue)
126+ .fromQueueNoneTerminated (queue)
128127 .evalTap(setSocketOptions(options))
129128 .flatMap(sock => Stream .resource(Socket .forAsync(sock)))
130- .concurrently(Stream .eval(error.get.flatMap(F .raiseError[Unit ])))
131129 } yield (ipAddress, sockets)).adaptError { case IOException (ex) => ex }
132130
133131 }
0 commit comments