From 188a96a7ba793fdea6c13d459e6b9c72345ef14a Mon Sep 17 00:00:00 2001 From: esmeetu Date: Tue, 4 Nov 2025 23:17:53 +0800 Subject: [PATCH 1/4] Fix blocking and log when enable logging Signed-off-by: esmeetu --- vllm/entrypoints/openai/api_server.py | 132 +++++++++++++++----------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index e184f22f3630..96605b90bf0b 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -31,7 +31,6 @@ from fastapi.responses import JSONResponse, Response, StreamingResponse from prometheus_client import make_asgi_app from prometheus_fastapi_instrumentator import Instrumentator -from starlette.concurrency import iterate_in_threadpool from starlette.datastructures import URL, Headers, MutableHeaders, State from starlette.routing import Mount from starlette.types import ASGIApp, Message, Receive, Scope, Send @@ -818,7 +817,7 @@ async def create_pooling(request: PoolingRequest, raw_request: Request): return JSONResponse( content=generator.model_dump(), status_code=generator.error.code ) - elif isinstance(generator, (PoolingResponse, IOProcessorResponse)): + elif isinstance(generator, PoolingResponse | IOProcessorResponse): return JSONResponse(content=generator.model_dump()) elif isinstance(generator, PoolingBytesResponse): return StreamingResponse( @@ -1151,7 +1150,7 @@ async def collective_rpc(raw_request: Request): return Response(status_code=200) response: list[Any] = [] for result in results: - if result is None or isinstance(result, (dict, list)): + if result is None or isinstance(result, dict | list): response.append(result) else: response.append(str(result)) @@ -1548,50 +1547,69 @@ def get_complete_content(self) -> str: return "".join(self.content_buffer) -def _log_streaming_response(response, response_body: list) -> None: - """Log streaming response with robust SSE parsing.""" - from starlette.concurrency import iterate_in_threadpool +def _log_streaming_response( + body_iterator: AsyncIterator[bytes], +) -> AsyncIterator[bytes]: + """Wrap an async body iterator to log SSE content while streaming.""" sse_decoder = SSEDecoder() chunk_count = 0 + done_logged = False - def buffered_iterator(): - nonlocal chunk_count - - for chunk in response_body: + async def generator(): + nonlocal chunk_count, done_logged + async for section in body_iterator: chunk_count += 1 - yield chunk - - # Parse SSE events from chunk - events = sse_decoder.decode_chunk(chunk) - - for event in events: - if event["type"] == "data": - content = sse_decoder.extract_content(event["data"]) - sse_decoder.add_content(content) - elif event["type"] == "done": - # Log complete content when done - full_content = sse_decoder.get_complete_content() - if full_content: - # Truncate if too long - if len(full_content) > 2048: - full_content = full_content[:2048] + "" - "...[truncated]" - logger.info( - "response_body={streaming_complete: " - "content='%s', chunks=%d}", - full_content, - chunk_count, - ) - else: - logger.info( - "response_body={streaming_complete: no_content, chunks=%d}", - chunk_count, - ) - return + try: + events = sse_decoder.decode_chunk(section) + for event in events: + if event["type"] == "data": + content = sse_decoder.extract_content(event["data"]) + sse_decoder.add_content(content) + elif event["type"] == "done" and not done_logged: + full_content = sse_decoder.get_complete_content() + if full_content: + if len(full_content) > 2048: + full_content = full_content[:2048] + logger.info( + ( + "response_body={streaming_complete: content=%r, " + "chunks=%d}" + ), + full_content, + chunk_count, + ) + else: + logger.info( + ( + "response_body={streaming_complete: no_content, " + "chunks=%d}" + ), + chunk_count, + ) + done_logged = True + except Exception: + # Best-effort logging; never break streaming + pass + yield section + # In case no explicit DONE was received, log buffered content once + if not done_logged: + full_content = sse_decoder.get_complete_content() + if full_content: + if len(full_content) > 2048: + full_content = full_content[:2048] + logger.info( + "response_body={streaming_complete: content=%r, chunks=%d}", + full_content, + chunk_count, + ) + else: + logger.info( + "response_body={streaming_complete: no_content, chunks=%d}", + chunk_count, + ) - response.body_iterator = iterate_in_threadpool(buffered_iterator()) - logger.info("response_body={streaming_started: chunks=%d}", len(response_body)) + return generator() def _log_non_streaming_response(response_body: list) -> None: @@ -1673,19 +1691,27 @@ async def validation_exception_handler(_: Request, exc: RequestValidationError): @app.middleware("http") async def log_response(request: Request, call_next): response = await call_next(request) - response_body = [section async for section in response.body_iterator] - response.body_iterator = iterate_in_threadpool(iter(response_body)) - # Check if this is a streaming response by looking at content-type - content_type = response.headers.get("content-type", "") - is_streaming = content_type == "text/event-stream; charset=utf-8" - - # Log response body based on type - if not response_body: - logger.info("response_body={}") - elif is_streaming: - _log_streaming_response(response, response_body) + # Determine if this is SSE streaming; preserve streaming semantics + content_type = response.headers.get("content-type", "").lower() + is_streaming = content_type.startswith("text/event-stream") + + original_iter = response.body_iterator + + if is_streaming: + response.body_iterator = _log_streaming_response(original_iter) else: - _log_non_streaming_response(response_body) + + async def tee_non_streaming(): + body_chunks: list[bytes] = [] + async for section in original_iter: + body_chunks.append(section) + yield section + if not body_chunks: + logger.info("response_body={}") + else: + _log_non_streaming_response(body_chunks) + + response.body_iterator = tee_non_streaming() return response for middleware in args.middleware: From 17f5efb9feffa26bfc4f7700fcf2dcfd717bded8 Mon Sep 17 00:00:00 2001 From: esmeetu Date: Tue, 4 Nov 2025 23:51:00 +0800 Subject: [PATCH 2/4] revert tuple Signed-off-by: esmeetu --- vllm/entrypoints/openai/api_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 96605b90bf0b..97a3a1e34154 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -817,7 +817,7 @@ async def create_pooling(request: PoolingRequest, raw_request: Request): return JSONResponse( content=generator.model_dump(), status_code=generator.error.code ) - elif isinstance(generator, PoolingResponse | IOProcessorResponse): + elif isinstance(generator, (PoolingResponse, IOProcessorResponse)): return JSONResponse(content=generator.model_dump()) elif isinstance(generator, PoolingBytesResponse): return StreamingResponse( @@ -1150,7 +1150,7 @@ async def collective_rpc(raw_request: Request): return Response(status_code=200) response: list[Any] = [] for result in results: - if result is None or isinstance(result, dict | list): + if result is None or isinstance(result, (dict, list)): response.append(result) else: response.append(str(result)) From 9ca90de9e62ebf501d690e76523ceabba6705927 Mon Sep 17 00:00:00 2001 From: esmeetu Date: Tue, 4 Nov 2025 23:57:49 +0800 Subject: [PATCH 3/4] apply suggestion Signed-off-by: esmeetu --- vllm/entrypoints/openai/api_server.py | 64 +++++++++++---------------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 97a3a1e34154..8f2fbf5badce 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1553,11 +1553,27 @@ def _log_streaming_response( """Wrap an async body iterator to log SSE content while streaming.""" sse_decoder = SSEDecoder() - chunk_count = 0 - done_logged = False + + def _log_final_content(content: str, count: int): + """Helper to log the final content.""" + if content: + # Truncate if too long + if len(content) > 2048: + content = content[:2048] + logger.info( + "response_body={streaming_complete: content=%r, chunks=%d}", + content, + count, + ) + else: + logger.info( + "response_body={streaming_complete: no_content, chunks=%d}", + count, + ) async def generator(): - nonlocal chunk_count, done_logged + chunk_count = 0 + done_logged = False async for section in body_iterator: chunk_count += 1 try: @@ -1567,47 +1583,17 @@ async def generator(): content = sse_decoder.extract_content(event["data"]) sse_decoder.add_content(content) elif event["type"] == "done" and not done_logged: - full_content = sse_decoder.get_complete_content() - if full_content: - if len(full_content) > 2048: - full_content = full_content[:2048] - logger.info( - ( - "response_body={streaming_complete: content=%r, " - "chunks=%d}" - ), - full_content, - chunk_count, - ) - else: - logger.info( - ( - "response_body={streaming_complete: no_content, " - "chunks=%d}" - ), - chunk_count, - ) + _log_final_content( + sse_decoder.get_complete_content(), chunk_count + ) done_logged = True - except Exception: + except Exception as e: # Best-effort logging; never break streaming - pass + logger.warning("Error parsing response stream for logging: %s", e) yield section # In case no explicit DONE was received, log buffered content once if not done_logged: - full_content = sse_decoder.get_complete_content() - if full_content: - if len(full_content) > 2048: - full_content = full_content[:2048] - logger.info( - "response_body={streaming_complete: content=%r, chunks=%d}", - full_content, - chunk_count, - ) - else: - logger.info( - "response_body={streaming_complete: no_content, chunks=%d}", - chunk_count, - ) + _log_final_content(sse_decoder.get_complete_content(), chunk_count) return generator() From 148d894e12fe9472eb0e30e4d65ea5fe8aee07e7 Mon Sep 17 00:00:00 2001 From: esmeetu Date: Thu, 6 Nov 2025 21:35:44 +0800 Subject: [PATCH 4/4] revert blocking fix Signed-off-by: esmeetu --- vllm/entrypoints/openai/api_server.py | 111 ++++++++++++-------------- 1 file changed, 49 insertions(+), 62 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 8f2fbf5badce..cfd143552e2d 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -31,6 +31,7 @@ from fastapi.responses import JSONResponse, Response, StreamingResponse from prometheus_client import make_asgi_app from prometheus_fastapi_instrumentator import Instrumentator +from starlette.concurrency import iterate_in_threadpool from starlette.datastructures import URL, Headers, MutableHeaders, State from starlette.routing import Mount from starlette.types import ASGIApp, Message, Receive, Scope, Send @@ -1547,55 +1548,49 @@ def get_complete_content(self) -> str: return "".join(self.content_buffer) -def _log_streaming_response( - body_iterator: AsyncIterator[bytes], -) -> AsyncIterator[bytes]: - """Wrap an async body iterator to log SSE content while streaming.""" +def _log_streaming_response(response, response_body: list) -> None: + """Log streaming response with robust SSE parsing.""" + from starlette.concurrency import iterate_in_threadpool sse_decoder = SSEDecoder() + chunk_count = 0 - def _log_final_content(content: str, count: int): - """Helper to log the final content.""" - if content: - # Truncate if too long - if len(content) > 2048: - content = content[:2048] - logger.info( - "response_body={streaming_complete: content=%r, chunks=%d}", - content, - count, - ) - else: - logger.info( - "response_body={streaming_complete: no_content, chunks=%d}", - count, - ) + def buffered_iterator(): + nonlocal chunk_count - async def generator(): - chunk_count = 0 - done_logged = False - async for section in body_iterator: + for chunk in response_body: chunk_count += 1 - try: - events = sse_decoder.decode_chunk(section) - for event in events: - if event["type"] == "data": - content = sse_decoder.extract_content(event["data"]) - sse_decoder.add_content(content) - elif event["type"] == "done" and not done_logged: - _log_final_content( - sse_decoder.get_complete_content(), chunk_count + yield chunk + + # Parse SSE events from chunk + events = sse_decoder.decode_chunk(chunk) + + for event in events: + if event["type"] == "data": + content = sse_decoder.extract_content(event["data"]) + sse_decoder.add_content(content) + elif event["type"] == "done": + # Log complete content when done + full_content = sse_decoder.get_complete_content() + if full_content: + # Truncate if too long + if len(full_content) > 2048: + full_content = full_content[:2048] + "" + "...[truncated]" + logger.info( + "response_body={streaming_complete: content=%r, chunks=%d}", + full_content, + chunk_count, + ) + else: + logger.info( + "response_body={streaming_complete: no_content, chunks=%d}", + chunk_count, ) - done_logged = True - except Exception as e: - # Best-effort logging; never break streaming - logger.warning("Error parsing response stream for logging: %s", e) - yield section - # In case no explicit DONE was received, log buffered content once - if not done_logged: - _log_final_content(sse_decoder.get_complete_content(), chunk_count) + return - return generator() + response.body_iterator = iterate_in_threadpool(buffered_iterator()) + logger.info("response_body={streaming_started: chunks=%d}", len(response_body)) def _log_non_streaming_response(response_body: list) -> None: @@ -1677,27 +1672,19 @@ async def validation_exception_handler(_: Request, exc: RequestValidationError): @app.middleware("http") async def log_response(request: Request, call_next): response = await call_next(request) - # Determine if this is SSE streaming; preserve streaming semantics - content_type = response.headers.get("content-type", "").lower() - is_streaming = content_type.startswith("text/event-stream") - - original_iter = response.body_iterator - - if is_streaming: - response.body_iterator = _log_streaming_response(original_iter) + response_body = [section async for section in response.body_iterator] + response.body_iterator = iterate_in_threadpool(iter(response_body)) + # Check if this is a streaming response by looking at content-type + content_type = response.headers.get("content-type", "") + is_streaming = content_type == "text/event-stream; charset=utf-8" + + # Log response body based on type + if not response_body: + logger.info("response_body={}") + elif is_streaming: + _log_streaming_response(response, response_body) else: - - async def tee_non_streaming(): - body_chunks: list[bytes] = [] - async for section in original_iter: - body_chunks.append(section) - yield section - if not body_chunks: - logger.info("response_body={}") - else: - _log_non_streaming_response(body_chunks) - - response.body_iterator = tee_non_streaming() + _log_non_streaming_response(response_body) return response for middleware in args.middleware: