55import os
66import socket
77import traceback
8+
9+ from google .protobuf import any_pb2
10+ from google .rpc import code_pb2 , status_pb2 , error_details_pb2
11+ from grpc_status import rpc_status
812from abc import ABCMeta , abstractmethod
913from collections .abc import Iterator
1014from concurrent .futures import ThreadPoolExecutor
@@ -240,6 +244,21 @@ def check_instance(instance, callable_type) -> bool:
240244 return False
241245
242246
247+ def get_grpc_status (err : str ):
248+ """
249+ Create a grpc status object with the error details.
250+ """
251+ details = any_pb2 .Any ()
252+ details .Pack (
253+ error_details_pb2 .DebugInfo (
254+ detail = "\n " .join (traceback .format_stack ()),
255+ )
256+ )
257+
258+ status = status_pb2 .Status (code = code_pb2 .INTERNAL , message = err , details = [details ])
259+ return rpc_status .to_status (status )
260+
261+
243262def exit_on_error (
244263 context : NumaflowServicerContext , err : str , parent : bool = False , update_context = True
245264):
@@ -255,8 +274,12 @@ def exit_on_error(
255274 the context with the error codes
256275 """
257276 if update_context :
258- context .set_code (grpc .StatusCode .UNKNOWN )
277+ # Create a status object with the error details
278+ grpc_status = get_grpc_status (err )
279+
280+ context .set_code (grpc .StatusCode .INTERNAL )
259281 context .set_details (err )
282+ context .set_trailing_metadata (grpc_status .trailing_metadata )
260283
261284 p = psutil .Process (os .getpid ())
262285 # If the parent flag is true, we exit from the parent process
@@ -267,15 +290,19 @@ def exit_on_error(
267290 p .kill ()
268291
269292
270- def update_context_err (context : NumaflowServicerContext , e : BaseException ):
293+ def update_context_err (context : NumaflowServicerContext , e : BaseException , err_msg : str ):
271294 """
272295 Update the context with the error and log the exception.
273296 """
274297 trace = get_exception_traceback_str (e )
275298 _LOGGER .critical (trace )
276299 _LOGGER .critical (e .__str__ ())
277- context .set_code (grpc .StatusCode .UNKNOWN )
278- context .set_details (e .__str__ ())
300+
301+ grpc_status = get_grpc_status (err_msg )
302+
303+ context .set_code (grpc .StatusCode .INTERNAL )
304+ context .set_details (err_msg )
305+ context .set_trailing_metadata (grpc_status .trailing_metadata )
279306
280307
281308def get_exception_traceback_str (exc ) -> str :
@@ -284,12 +311,15 @@ def get_exception_traceback_str(exc) -> str:
284311 return file .getvalue ().rstrip ()
285312
286313
287- async def handle_async_error (context : NumaflowServicerContext , exception : BaseException ):
314+ async def handle_async_error (
315+ context : NumaflowServicerContext , exception : BaseException , exception_type : str
316+ ):
288317 """
289318 Handle exceptions for async servers by updating the context and exiting.
290319 """
291- update_context_err (context , exception )
320+ err_msg = f"{ exception_type } : { repr (exception )} "
321+ update_context_err (context , exception , err_msg )
292322 await asyncio .gather (
293- context .abort (grpc .StatusCode .UNKNOWN , details = repr ( exception ) ), return_exceptions = True
323+ context .abort (grpc .StatusCode .INTERNAL , details = err_msg ), return_exceptions = True
294324 )
295- exit_on_error (err = repr ( exception ) , parent = False , context = context , update_context = False )
325+ exit_on_error (err = err_msg , parent = False , context = context , update_context = False )
0 commit comments