Skip to content

Commit 929712b

Browse files
nathanborrormattt
andauthored
Actually stream the data (#31)
* Actually stream the data * Apparently `Task { @mainactor in ... }` is non-blocking but `await MainActor.run` is blocking * Revert last change, it's not the problem * Should only need one while loop * Use EventSource convenience method * Add Linux specific implementation for request streaming --------- Co-authored-by: Mattt Zmuda <[email protected]>
1 parent da8ee07 commit 929712b

File tree

1 file changed

+123
-10
lines changed

1 file changed

+123
-10
lines changed

Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,30 +142,31 @@ extension URLSession {
142142
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
143143
}
144144

145-
let (asyncBytes, response) = try await self.data(for: request)
145+
#if canImport(FoundationNetworking)
146+
let (asyncBytes, response) = try await self.linuxBytes(for: request)
147+
#else
148+
let (asyncBytes, response) = try await self.bytes(for: request)
149+
#endif
146150

147151
guard let httpResponse = response as? HTTPURLResponse else {
148152
throw URLSessionError.invalidResponse
149153
}
150154

151155
guard (200 ..< 300).contains(httpResponse.statusCode) else {
152-
if let errorString = String(data: asyncBytes, encoding: .utf8) {
156+
var errorData = Data()
157+
for try await byte in asyncBytes {
158+
errorData.append(byte)
159+
}
160+
if let errorString = String(data: errorData, encoding: .utf8) {
153161
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: errorString)
154162
}
155163
throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: "Invalid response")
156164
}
157165

158166
let decoder = JSONDecoder()
159-
let parser = EventSource.Parser()
160-
161-
for byte in asyncBytes {
162-
await parser.consume(byte)
163-
}
164-
await parser.finish()
165167

166-
while let event = await parser.getNextEvent() {
168+
for try await event in asyncBytes.events {
167169
guard let data = event.data.data(using: .utf8) else { continue }
168-
169170
if let decoded = try? decoder.decode(T.self, from: data) {
170171
continuation.yield(decoded)
171172
}
@@ -184,6 +185,118 @@ extension URLSession {
184185
}
185186
}
186187

188+
#if canImport(FoundationNetworking)
189+
private extension URLSession {
190+
func linuxBytes(for request: URLRequest) async throws -> (AsyncThrowingStream<UInt8, Error>, URLResponse) {
191+
let delegate = LinuxBytesDelegate()
192+
let delegateQueue = OperationQueue()
193+
delegateQueue.maxConcurrentOperationCount = 1
194+
195+
let session = URLSession(
196+
configuration: self.configuration,
197+
delegate: delegate,
198+
delegateQueue: delegateQueue
199+
)
200+
201+
let byteStream = AsyncThrowingStream<UInt8, Error> { continuation in
202+
delegate.attach(
203+
continuation,
204+
session: session
205+
)
206+
}
207+
208+
let response = try await delegate.start(
209+
request: request,
210+
session: session
211+
)
212+
213+
return (byteStream, response)
214+
}
215+
}
216+
217+
private final class LinuxBytesDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
218+
private var responseContinuation: CheckedContinuation<URLResponse, Error>?
219+
private var byteContinuation: AsyncThrowingStream<UInt8, Error>.Continuation?
220+
private weak var task: URLSessionDataTask?
221+
private weak var session: URLSession?
222+
223+
func attach(
224+
_ continuation: AsyncThrowingStream<UInt8, Error>.Continuation,
225+
session: URLSession
226+
) {
227+
byteContinuation = continuation
228+
self.session = session
229+
continuation.onTermination = { [weak self] _ in
230+
guard let self else { return }
231+
self.task?.cancel()
232+
self.session?.invalidateAndCancel()
233+
}
234+
}
235+
236+
func start(
237+
request: URLRequest,
238+
session: URLSession
239+
) async throws -> URLResponse {
240+
try await withCheckedThrowingContinuation { continuation in
241+
responseContinuation = continuation
242+
let task = session.dataTask(with: request)
243+
self.task = task
244+
task.resume()
245+
}
246+
}
247+
248+
func urlSession(
249+
_ session: URLSession,
250+
dataTask: URLSessionDataTask,
251+
didReceive response: URLResponse,
252+
completionHandler: @escaping @Sendable (URLSession.ResponseDisposition) -> Void
253+
) {
254+
if let continuation = responseContinuation {
255+
continuation.resume(returning: response)
256+
responseContinuation = nil
257+
}
258+
completionHandler(.allow)
259+
}
260+
261+
func urlSession(
262+
_ session: URLSession,
263+
dataTask: URLSessionDataTask,
264+
didReceive data: Data
265+
) {
266+
guard let continuation = byteContinuation else { return }
267+
for byte in data {
268+
continuation.yield(byte)
269+
}
270+
}
271+
272+
func urlSession(
273+
_ session: URLSession,
274+
task: URLSessionTask,
275+
didCompleteWithError error: (any Error)?
276+
) {
277+
if let continuation = responseContinuation {
278+
if let error {
279+
continuation.resume(throwing: error)
280+
} else if let response = task.response {
281+
continuation.resume(returning: response)
282+
} else {
283+
continuation.resume(throwing: URLSessionError.invalidResponse)
284+
}
285+
responseContinuation = nil
286+
}
287+
288+
if let error {
289+
byteContinuation?.finish(throwing: error)
290+
} else {
291+
byteContinuation?.finish()
292+
}
293+
byteContinuation = nil
294+
295+
session.invalidateAndCancel()
296+
}
297+
}
298+
#endif
299+
187300
enum URLSessionError: Error, CustomStringConvertible {
188301
case invalidResponse
189302
case httpError(statusCode: Int, detail: String)

0 commit comments

Comments
 (0)