Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions container/log_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,19 @@ def send_batch():
if not current_batch:
return

logger.info(f"Sending batch of {len(current_batch)} events to CloudWatch")
# Calculate actual batch payload size for verification
actual_batch_size = sum(len(event['message'].encode('utf-8')) + 26 for event in current_batch)
overhead_bytes = len(current_batch) * 26
message_bytes = actual_batch_size - overhead_bytes
overhead_percentage = (overhead_bytes / actual_batch_size) * 100 if actual_batch_size > 0 else 0

logger.info(f"Sending batch of {len(current_batch)} events to CloudWatch: "
f"{actual_batch_size:,} bytes total "
f"({message_bytes:,} message + {overhead_bytes:,} overhead = {overhead_percentage:.1f}% overhead)")

# Debug log for batch size verification
logger.debug(f"Batch size verification: calculated={current_batch_size}, actual={actual_batch_size}, "
f"diff={abs(current_batch_size - actual_batch_size)}")

# Retry logic matching Vector: 3 attempts, 30 second max duration
max_retries = 3
Expand Down Expand Up @@ -1130,28 +1142,51 @@ def send_batch():

for event in events:
# Calculate event size (approximate)
event_size = len(event['message'].encode('utf-8')) + 26 # 26 bytes overhead per event
message_bytes = len(event['message'].encode('utf-8'))
overhead_bytes = 26 # 26 bytes overhead per event
event_size = message_bytes + overhead_bytes

# Enhanced logging for byte calculations (debug level)
logger.debug(f"Event {events_processed + 1}: message_bytes={message_bytes}, "
f"overhead_bytes={overhead_bytes}, total_bytes={event_size}")

# Check if adding this event would exceed limits BEFORE adding it
would_exceed_size = (current_batch_size + event_size) > max_bytes_per_batch
would_exceed_count = (len(current_batch) + 1) > max_events_per_batch
timeout_reached = (time.time() - batch_start_time) >= timeout_secs

# Send current batch if adding this event would exceed limits
if current_batch and (would_exceed_size or would_exceed_count or timeout_reached):
send_reason = []
if would_exceed_size:
send_reason.append(f"would_exceed_size: {current_batch_size}+{event_size}>{max_bytes_per_batch}")
if would_exceed_count:
send_reason.append(f"would_exceed_count: {len(current_batch)}+1>{max_events_per_batch}")
if timeout_reached:
send_reason.append(f"timeout={time.time() - batch_start_time:.1f}s>={timeout_secs}s")

logger.debug(f"Sending batch before adding event: {len(current_batch)} events, {current_batch_size} bytes. "
f"Trigger: {', '.join(send_reason)}")

send_batch()
batch_start_time = time.time()

# Add event to current batch first
# Now add the event to the (possibly new) batch
current_batch.append(event)
current_batch_size += event_size
events_processed += 1

# Check if we need to send current batch after adding this event
should_send = (
len(current_batch) >= max_events_per_batch or
current_batch_size > max_bytes_per_batch or
(time.time() - batch_start_time) >= timeout_secs
)

if should_send:
send_batch()
batch_start_time = time.time()

# Send final batch
if current_batch:
logger.debug(f"Sending final batch: {len(current_batch)} events, {current_batch_size} bytes")
send_batch()

# Log overall batching efficiency summary
total_events = successful_events + failed_events
if total_events > 0:
logger.info(f"CloudWatch batching complete: {total_events:,} events processed "
f"({successful_events:,} successful, {failed_events:,} failed)")

return {
'successful_events': successful_events,
'failed_events': failed_events,
Expand Down
Loading