@@ -216,41 +216,43 @@ def _process_batch_with_retry(
216216 ** kwargs ,
217217 ) -> Iterator [list [core_types .ScoredOutput ]]:
218218 """Process a batch of prompts with individual chunk retry capability.
219-
219+
220220 This method processes each chunk individually and retries failed chunks
221221 due to transient errors (like 503 "model overloaded") while preserving
222222 successful chunks from the same batch.
223-
223+
224224 Args:
225225 batch_prompts: List of prompts for the batch
226226 batch: List of TextChunk objects corresponding to the prompts
227227 **kwargs: Additional arguments passed to the language model
228-
228+
229229 Yields:
230230 Lists of ScoredOutputs, with retries for failed chunks
231231 """
232232 try :
233- batch_results = list (self ._language_model .infer (
234- batch_prompts = batch_prompts ,
235- ** kwargs ,
236- ))
237-
233+ batch_results = list (
234+ self ._language_model .infer (
235+ batch_prompts = batch_prompts ,
236+ ** kwargs ,
237+ )
238+ )
239+
238240 for result in batch_results :
239241 yield result
240242 return
241-
243+
242244 except Exception as e :
243245 if not retry_utils .is_transient_error (e ):
244246 raise
245-
247+
246248 logging .warning (
247249 "Batch processing failed with transient error: %s. "
248250 "Falling back to individual chunk processing with retry." ,
249- str (e )
251+ str (e ),
250252 )
251-
253+
252254 individual_results = []
253-
255+
254256 for i , (prompt , chunk ) in enumerate (zip (batch_prompts , batch )):
255257 try :
256258 chunk_result = self ._process_single_chunk_with_retry (
@@ -264,16 +266,19 @@ def _process_batch_with_retry(
264266 ** kwargs ,
265267 )
266268 individual_results .append (chunk_result )
267-
269+
268270 except Exception as e :
269271 logging .error (
270272 "Failed to process chunk %d after retries: %s. "
271273 "Chunk info: document_id=%s, text_length=%d. "
272274 "Stopping document processing." ,
273- i , str (e ), chunk .document_id , len (chunk .chunk_text )
275+ i ,
276+ str (e ),
277+ chunk .document_id ,
278+ len (chunk .chunk_text ),
274279 )
275280 raise
276-
281+
277282 for result in individual_results :
278283 yield result
279284
@@ -289,7 +294,7 @@ def _process_single_chunk_with_retry(
289294 ** kwargs ,
290295 ) -> list [core_types .ScoredOutput ]:
291296 """Process a single chunk with retry logic.
292-
297+
293298 Args:
294299 prompt: The prompt for this chunk
295300 chunk: The TextChunk object
@@ -299,59 +304,69 @@ def _process_single_chunk_with_retry(
299304 retry_backoff_factor: Backoff multiplier for retries
300305 retry_max_delay: Maximum delay between retries
301306 **kwargs: Additional arguments for the language model
302-
307+
303308 Returns:
304309 List containing a single ScoredOutput for this chunk
305310 """
306311 last_exception = None
307312 delay = retry_initial_delay
308-
313+
309314 for attempt in range (max_retries + 1 ):
310315 try :
311- batch_results = list (self ._language_model .infer (
312- batch_prompts = [prompt ],
313- ** kwargs ,
314- ))
315-
316+ batch_results = list (
317+ self ._language_model .infer (
318+ batch_prompts = [prompt ],
319+ ** kwargs ,
320+ )
321+ )
322+
316323 if not batch_results :
317324 raise exceptions .InferenceOutputError (
318325 f"No results returned for chunk in document { chunk .document_id } "
319326 )
320-
327+
321328 return batch_results [0 ]
322-
329+
323330 except Exception as e :
324331 last_exception = e
325-
332+
326333 if not retry_transient_errors or not retry_utils .is_transient_error (e ):
327334 logging .debug (
328- "Not retrying chunk processing: retry_disabled=%s, is_transient=%s, error=%s" ,
329- not retry_transient_errors , retry_utils .is_transient_error (e ), str (e )
335+ "Not retrying chunk processing: retry_disabled=%s,"
336+ " is_transient=%s, error=%s" ,
337+ not retry_transient_errors ,
338+ retry_utils .is_transient_error (e ),
339+ str (e ),
330340 )
331341 raise
332-
342+
333343 if attempt >= max_retries :
334344 logging .error (
335345 "Chunk processing failed after %d retries: %s" ,
336- max_retries , str (e )
346+ max_retries ,
347+ str (e ),
337348 )
338349 raise
339-
350+
340351 current_delay = min (delay , retry_max_delay )
341-
352+
342353 import random
354+
343355 jitter_amount = current_delay * 0.1 * random .random ()
344356 current_delay += jitter_amount
345-
357+
346358 logging .warning (
347- "Chunk processing failed on attempt %d/%d due to transient error: %s. "
348- "Retrying in %.2f seconds..." ,
349- attempt + 1 , max_retries + 1 , str (e ), current_delay
359+ "Chunk processing failed on attempt %d/%d due to transient error:"
360+ " %s. Retrying in %.2f seconds..." ,
361+ attempt + 1 ,
362+ max_retries + 1 ,
363+ str (e ),
364+ current_delay ,
350365 )
351-
366+
352367 time .sleep (current_delay )
353368 delay = min (delay * retry_backoff_factor , retry_max_delay )
354-
369+
355370 if last_exception :
356371 raise last_exception
357372 raise RuntimeError ("Chunk retry logic failed unexpectedly" )
0 commit comments