@@ -25,6 +25,22 @@ def __init__(self, handler: MapSyncCallable, multiproc: bool = False):
2525 self .multiproc = multiproc
2626 # create a thread pool for executing UDF code
2727 self .executor = ThreadPoolExecutor (max_workers = NUM_THREADS_DEFAULT )
28+ # Thread-safe event to track shutdown state and prevent race conditions
29+ self ._shutdown_event = threading .Event ()
30+ self ._shutdown_lock = threading .Lock () # NEW: lock for shutdown/error handling
31+
32+ def _handle_error (self , context , error ):
33+ """
34+ Ensures only one thread triggers shutdown and error reporting.
35+ """
36+ with self ._shutdown_lock :
37+ if not self ._shutdown_event .is_set ():
38+ self ._shutdown_event .set ()
39+ exit_on_error (
40+ context , f"{ ERR_UDF_EXCEPTION_STRING } : { repr (error )} " , parent = self .multiproc
41+ )
42+ else :
43+ _LOGGER .info ("Shutdown already initiated by another thread, exiting quietly" )
2844
2945 def MapFn (
3046 self ,
@@ -56,10 +72,7 @@ def MapFn(
5672 for res in result_queue .read_iterator ():
5773 # if error handler accordingly
5874 if isinstance (res , BaseException ):
59- # Terminate the current server process due to exception
60- exit_on_error (
61- context , f"{ ERR_UDF_EXCEPTION_STRING } : { repr (res )} " , parent = self .multiproc
62- )
75+ self ._handle_error (context , res )
6376 return
6477 # return the result
6578 yield res
@@ -70,10 +83,7 @@ def MapFn(
7083
7184 except BaseException as err :
7285 _LOGGER .critical ("UDFError, re-raising the error" , exc_info = True )
73- # Terminate the current server process due to exception
74- exit_on_error (
75- context , f"{ ERR_UDF_EXCEPTION_STRING } : { repr (err )} " , parent = self .multiproc
76- )
86+ self ._handle_error (context , err )
7787 return
7888
7989 def _process_requests (
@@ -86,6 +96,10 @@ def _process_requests(
8696 # read through all incoming requests and submit to the
8797 # threadpool for invocation
8898 for request in request_iterator :
99+ # Check if shutdown has been initiated before submitting new tasks
100+ if self ._shutdown_event .is_set ():
101+ _LOGGER .info ("Shutdown initiated, stopping request processing" )
102+ break
89103 _ = self .executor .submit (self ._invoke_map , context , request , result_queue )
90104 # wait for all tasks to finish after all requests exhausted
91105 self .executor .shutdown (wait = True )
@@ -101,6 +115,11 @@ def _invoke_map(
101115 result_queue : SyncIterator ,
102116 ):
103117 try :
118+ # Check if shutdown has been initiated before processing
119+ if self ._shutdown_event .is_set ():
120+ _LOGGER .info ("Shutdown initiated, skipping map invocation" )
121+ return
122+
104123 d = Datum (
105124 keys = list (request .request .keys ),
106125 value = request .request .value ,
@@ -123,7 +142,11 @@ def _invoke_map(
123142
124143 except BaseException as e :
125144 _LOGGER .critical ("MapFn handler error" , exc_info = True )
126- result_queue .put (e )
145+ # Only put the exception in the queue if shutdown hasn't been initiated
146+ if not self ._shutdown_event .is_set ():
147+ result_queue .put (e )
148+ else :
149+ _LOGGER .info ("Shutdown already initiated, not queuing additional error" )
127150 return
128151
129152 def IsReady (
0 commit comments