Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion application_sdk/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from temporalio import activity

from application_sdk.activities.common.utils import get_object_store_prefix
from application_sdk.constants import DAPR_MAX_GRPC_MESSAGE_LENGTH
from application_sdk.constants import (
DAPR_MAX_GRPC_MESSAGE_LENGTH,
ENABLE_ATLAN_UPLOAD,
UPSTREAM_OBJECT_STORE_NAME,
)
from application_sdk.io import DataframeType, Reader, WriteMode, Writer
from application_sdk.io._utils import (
PARQUET_FILE_EXTENSION,
Expand Down
31 changes: 17 additions & 14 deletions application_sdk/observability/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ async def parquet_sink(self, message: Any):
logging.error(f"Error buffering log: {e}")

async def _flush_records(self, records: List[Dict[str, Any]]):
"""Flush records to parquet file and object store using ParquetOutput abstraction.
"""Flush records to parquet file and object store using ParquetFileWriter.

Args:
records: List of records to flush

This method:
- Groups records by partition (year/month/day)
- Uses ParquetOutput abstraction for efficient writing
- Uses ParquetFileWriter for efficient writing
- Automatically handles chunking, compression, and dual upload
- Provides robust error handling per partition
- Cleans up old records if enabled
Expand All @@ -396,7 +396,7 @@ async def _flush_records(self, records: List[Dict[str, Any]]):
partition_records[partition_path] = []
partition_records[partition_path].append(record)

# Write records to each partition using ParquetOutput abstraction
# Write records to each partition using ParquetFileWriter
for partition_path, partition_data in partition_records.items():
# Create new dataframe from current records
new_df = pd.DataFrame(partition_data)
Expand All @@ -413,25 +413,28 @@ async def _flush_records(self, records: List[Dict[str, Any]]):
elif part.startswith("day="):
new_df["day"] = int(part.split("=")[1])

# Use new data directly - let ParquetOutput handle consolidation and merging
# Use new data directly - let ParquetFileWriter handle consolidation and merging
df = new_df

# Use ParquetOutput abstraction for efficient writing and uploading
# Use ParquetFileWriter for efficient writing and uploading
# Set the output path for this partition
try:
# Lazy import and instantiation of ParquetOutput
from application_sdk.outputs.parquet import ParquetOutput

parquet_output = ParquetOutput(output_path=partition_path)
# Lazy import and instantiation of ParquetFileWriter
from application_sdk.io import DataframeType, WriteMode
from application_sdk.io.parquet import ParquetFileWriter

parquet_writer = ParquetFileWriter(
output_path=partition_path,
dataframe_type=DataframeType.daft,
use_consolidation=True,
)
logging.info(
f"Successfully instantiated ParquetOutput for partition: {partition_path}"
f"Successfully instantiated ParquetFileWriter for partition: {partition_path}"
)

# Use write_daft_dataframe with the DataFrame we have
from application_sdk.outputs.parquet import WriteMode

# Use _write_daft_dataframe with the DataFrame we have
daft_df = daft.from_pandas(df)
await parquet_output.write_daft_dataframe(
await parquet_writer._write_daft_dataframe(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must use write() method with dataframeType as daft

dataframe=daft_df,
write_mode=WriteMode.APPEND, # Append mode to merge with existing data
)
Expand Down