3030from application_sdk .observability .logger_adaptor import get_logger
3131from application_sdk .observability .metrics_adaptor import MetricType
3232from application_sdk .services .objectstore import ObjectStore
33- from application_sdk .constants import TEMPORARY_PATH
34- from application_sdk .decorators .method_lock import lock_per_run
3533
3634logger = get_logger (__name__ )
3735activity .logger = logger
@@ -423,10 +421,22 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic
423421 "partitions" : self .partitions ,
424422 }
425423
426- # Write the statistics to a json file
427- output_file_name = f"{ self .output_path } /statistics.json.ignore"
428- with open (output_file_name , "w" ) as f :
429- f .write (orjson .dumps (statistics ).decode ("utf-8" ))
424+ # Ensure typename is included in the statistics payload (if provided)
425+ if typename :
426+ statistics ["typename" ] = typename
427+
428+ # Write the statistics to a json file inside a dedicated statistics/ folder
429+ statistics_dir = os .path .join (self .output_path , "statistics" )
430+ os .makedirs (statistics_dir , exist_ok = True )
431+ output_file_name = f"{ statistics_dir } /statistics.json.ignore"
432+ # If chunk_start is provided, include it in the statistics filename
433+ try :
434+ cs = getattr (self , "chunk_start" , None )
435+ if cs is not None :
436+ output_file_name = f"{ statistics_dir } /statistics-chunk-{ cs } .json.ignore"
437+ except Exception :
438+ # If accessing chunk_start fails, fallback to default filename
439+ pass
430440
431441 destination_file_path = get_object_store_prefix (output_file_name )
432442 # Push the file to the object store
@@ -435,83 +445,7 @@ async def write_statistics(self, typename: Optional[str] = None) -> Optional[Dic
435445 destination = destination_file_path ,
436446 )
437447
438- if typename :
439- statistics ["typename" ] = typename
440- # Update aggregated statistics at run root in object store
441- try :
442- await self ._update_run_aggregate (destination_file_path , statistics )
443- except Exception as e :
444- logger .warning (f"Failed to update aggregated statistics: { str (e )} " )
445448 return statistics
446449 except Exception as e :
447450 logger .error (f"Error writing statistics: { str (e )} " )
448451
449- #TODO Do we need locking here ?
450- @lock_per_run ()
451- async def _update_run_aggregate (
452- self , per_path_destination : str , statistics : Dict [str , Any ]
453- ) -> None :
454- """Aggregate stats into a single file at the workflow run root.
455-
456- Args:
457- per_path_destination: Object store destination path for this stats file
458- (used as key in the aggregate map)
459- statistics: The statistics dictionary to store
460- """
461- inferred_phase = self ._infer_phase_from_path ()
462- if inferred_phase is None :
463- logger .info ("Phase could not be inferred from path. Skipping aggregation." )
464- return
465-
466- logger .info (f"Starting _update_run_aggregate for phase: { inferred_phase } " )
467- workflow_run_root_relative = build_output_path ()
468- output_file_name = f"{ TEMPORARY_PATH } { workflow_run_root_relative } /statistics.json.ignore"
469- destination_file_path = get_object_store_prefix (output_file_name )
470-
471- # Load existing aggregate from object store if present
472- # Structure: {"Extract": {"typename": {"record_count": N}}, "Transform": {...}, "Publish": {...}}
473- aggregate_by_phase : Dict [str , Dict [str , Dict [str , Any ]]] = {
474- "Extract" : {},
475- "Transform" : {},
476- "Publish" : {}
477- }
478-
479- try :
480- # Download existing aggregate file if present
481- await ObjectStore .download_file (
482- source = destination_file_path ,
483- destination = output_file_name ,
484- )
485- # Load existing JSON structure
486- with open (output_file_name , "r" ) as f :
487- existing_aggregate = orjson .loads (f .read ())
488- # Phase-based structure
489- aggregate_by_phase .update (existing_aggregate )
490- logger .info (f"Successfully loaded existing aggregates" )
491- except Exception :
492- logger .info (
493- "No existing aggregate found or failed to read. Initializing a new aggregate structure."
494- )
495-
496- # Accumulate statistics by typename within the phase
497- typename = statistics .get ("typename" , "unknown" )
498-
499- if typename not in aggregate_by_phase [inferred_phase ]:
500- aggregate_by_phase [inferred_phase ][typename ] = {
501- "record_count" : 0
502- }
503-
504- logger .info (f"Accumulating statistics for phase '{ inferred_phase } ', typename '{ typename } ': +{ statistics ['total_record_count' ]} records" )
505-
506- # Accumulate the record count
507- aggregate_by_phase [inferred_phase ][typename ]["record_count" ] += statistics ["total_record_count" ]
508-
509- with open (output_file_name , "w" ) as f :
510- f .write (orjson .dumps (aggregate_by_phase ).decode ("utf-8" ))
511- logger .info (f"Successfully updated aggregate with accumulated stats for phase '{ inferred_phase } '" )
512-
513- # Upload aggregate to object store
514- await ObjectStore .upload_file (
515- source = output_file_name ,
516- destination = destination_file_path ,
517- )
0 commit comments