Skip to content
Open
3,086 changes: 1,543 additions & 1,543 deletions docs/utilities/batch-processing.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,26 @@
// Validate typed handler configurations
ValidateTypedHandlerConfiguration();

// Check if typed handlers are configured (not yet fully supported in attributes)
// Check if typed handlers are configured
if (IsTypedHandlerConfigured())
{
throw new NotSupportedException("Typed record handlers are not yet fully supported with BatchProcessorAttribute. Please use direct typed batch processor calls for typed processing.");
// Create typed aspect handler
return eventType switch
{
BatchEventType.DynamoDbStream => CreateTypedBatchProcessingAspectHandler(() => TypedDynamoDbStreamBatchProcessor.TypedInstance, args),
BatchEventType.KinesisDataStream => CreateTypedBatchProcessingAspectHandler(() => TypedKinesisEventBatchProcessor.TypedInstance, args),
BatchEventType.Sqs => CreateTypedBatchProcessingAspectHandler(() => TypedSqsBatchProcessor.TypedInstance, args),
_ => throw new ArgumentOutOfRangeException($"{eventType}", eventType, "Unsupported event type.")
};
}

// Create aspect handler
// Create traditional aspect handler
return eventType switch
{
BatchEventType.DynamoDbStream => CreateBatchProcessingAspectHandler(() => DynamoDbStreamBatchProcessor.Instance),
BatchEventType.KinesisDataStream => CreateBatchProcessingAspectHandler(() => KinesisEventBatchProcessor.Instance),
BatchEventType.Sqs => CreateBatchProcessingAspectHandler(() => SqsBatchProcessor.Instance),
_ => throw new ArgumentOutOfRangeException(nameof(eventType), eventType, "Unsupported event type.")
_ => throw new ArgumentOutOfRangeException($"{eventType}", eventType, "Unsupported event type.")
};
}

Expand Down Expand Up @@ -395,6 +402,146 @@
});
}

private TypedBatchProcessingAspectHandler<TEvent, TRecord> CreateTypedBatchProcessingAspectHandler<TEvent, TRecord>(Func<ITypedBatchProcessor<TEvent, TRecord>> defaultTypedBatchProcessorProvider, IReadOnlyList<object> args)
{
// Create typed batch processor
ITypedBatchProcessor<TEvent, TRecord> typedBatchProcessor;
if (BatchProcessor != null && BatchProcessor.IsAssignableTo(TypedBatchProcessorTypes[GetEventTypeFromArgs(args)]))
{
try
{
typedBatchProcessor = (ITypedBatchProcessor<TEvent, TRecord>)Activator.CreateInstance(BatchProcessor)!;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{BatchProcessor.Name}'.", ex);
}
}
else
{
typedBatchProcessor = defaultTypedBatchProcessorProvider.Invoke();
}

// Create deserialization options
var deserializationOptions = new DeserializationOptions
{
ErrorPolicy = DeserializationErrorPolicy
};

if (JsonSerializerContext != null)
{
try
{
var jsonSerializerContext = (JsonSerializerContext)Activator.CreateInstance(JsonSerializerContext)!;
deserializationOptions.JsonSerializerContext = jsonSerializerContext;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of JsonSerializerContext: '{JsonSerializerContext.Name}'.", ex);
}
}

// Create processing options
var errorHandlingPolicy = Enum.TryParse(PowertoolsConfigurations.Instance.BatchProcessingErrorHandlingPolicy, true, out BatchProcessorErrorHandlingPolicy errHandlingPolicy)
? errHandlingPolicy
: ErrorHandlingPolicy;
if (ErrorHandlingPolicy != BatchProcessorErrorHandlingPolicy.DeriveFromEvent)
{
errorHandlingPolicy = ErrorHandlingPolicy;
}

var processingOptions = new ProcessingOptions
{
CancellationToken = CancellationToken.None,
ErrorHandlingPolicy = errorHandlingPolicy,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
BatchParallelProcessingEnabled = BatchParallelProcessingEnabled,
ThrowOnFullBatchFailure = ThrowOnFullBatchFailure
};

// Create typed handler wrapper
object typedHandler = null;
bool hasContext = false;

if (TypedRecordHandler != null)
{
try
{
typedHandler = Activator.CreateInstance(TypedRecordHandler)!;
hasContext = false;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{TypedRecordHandler.Name}'.", ex);
}
}
else if (TypedRecordHandlerProvider != null)
{
try
{
var provider = Activator.CreateInstance(TypedRecordHandlerProvider)!;
// Assume the provider has a Create() method that returns the handler
var createMethod = TypedRecordHandlerProvider.GetMethod("Create");

Check warning on line 484 in libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs

View workflow job for this annotation

GitHub Actions / build

'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicMethods' in call to 'System.Type.GetMethod(String)'. The return value of method 'AWS.Lambda.Powertools.BatchProcessing.BatchProcessorAttribute.TypedRecordHandlerProvider.get' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to.
if (createMethod == null)
{
throw new InvalidOperationException($"TypedRecordHandlerProvider '{TypedRecordHandlerProvider.Name}' must have a 'Create()' method.");
}
typedHandler = createMethod.Invoke(provider, null)!;
hasContext = false;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of typed record handler using provider: '{TypedRecordHandlerProvider.Name}'.", ex);
}
}
else if (TypedRecordHandlerWithContext != null)
{
try
{
typedHandler = Activator.CreateInstance(TypedRecordHandlerWithContext)!;
hasContext = true;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of: '{TypedRecordHandlerWithContext.Name}'.", ex);
}
}
else if (TypedRecordHandlerWithContextProvider != null)
{
try
{
var provider = Activator.CreateInstance(TypedRecordHandlerWithContextProvider)!;
// Assume the provider has a Create() method that returns the handler
var createMethod = TypedRecordHandlerWithContextProvider.GetMethod("Create");

Check warning on line 515 in libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs

View workflow job for this annotation

GitHub Actions / build

'this' argument does not satisfy 'DynamicallyAccessedMemberTypes.PublicMethods' in call to 'System.Type.GetMethod(String)'. The return value of method 'AWS.Lambda.Powertools.BatchProcessing.BatchProcessorAttribute.TypedRecordHandlerWithContextProvider.get' does not have matching annotations. The source value must declare at least the same requirements as those declared on the target location it is assigned to.
if (createMethod == null)
{
throw new InvalidOperationException($"TypedRecordHandlerWithContextProvider '{TypedRecordHandlerWithContextProvider.Name}' must have a 'Create()' method.");
}
typedHandler = createMethod.Invoke(provider, null)!;
hasContext = true;
}
catch (Exception ex)
{
throw new InvalidOperationException($"Error during creation of typed record handler with context using provider: '{TypedRecordHandlerWithContextProvider.Name}'.", ex);
}
}
else
{
throw new InvalidOperationException("A typed record handler or typed record handler provider is required.");
}

return new TypedBatchProcessingAspectHandler<TEvent, TRecord>(typedBatchProcessor, typedHandler, hasContext, deserializationOptions, processingOptions);
}

private static BatchEventType GetEventTypeFromArgs(IReadOnlyList<object> args)
{
if (args == null || args.Count == 0 || !EventTypes.TryGetValue(args[0].GetType(), out var eventType))
{
throw new ArgumentException($"The first function handler parameter must be of one of the following types: {string.Join(',', EventTypes.Keys.Select(x => $"'{x.Namespace}'"))}.");
}
return eventType;
}

private void ValidateTypedHandlerConfiguration()
{
// Ensure only one type of handler is configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Amazon.Lambda.DynamoDBEvents;
using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
using AWS.Lambda.Powertools.BatchProcessing.Internal;
using AWS.Lambda.Powertools.Common;

namespace AWS.Lambda.Powertools.BatchProcessing.DynamoDb;

Expand All @@ -18,19 +19,66 @@ public class TypedDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor, I
private readonly IDeserializationService _deserializationService;
private readonly IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> _recordDataExtractor;

/// <summary>
/// The singleton instance of the typed DynamoDB stream batch processor.
/// </summary>
private static ITypedBatchProcessor<DynamoDBEvent, DynamoDBEvent.DynamodbStreamRecord> _typedInstance;

/// <summary>
/// Gets the typed instance.
/// </summary>
/// <value>The typed instance.</value>
public static ITypedBatchProcessor<DynamoDBEvent, DynamoDBEvent.DynamodbStreamRecord> TypedInstance =>
_typedInstance ??= new TypedDynamoDbStreamBatchProcessor();

/// <summary>
/// Return the typed instance ProcessingResult
/// </summary>
public new static ProcessingResult<DynamoDBEvent.DynamodbStreamRecord> Result => _typedInstance?.ProcessingResult;


/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class.
/// </summary>
/// <param name="powertoolsConfigurations">The Powertools configurations.</param>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
/// <param name="recordDataExtractor">The record data extractor. If null, uses DynamoDbRecordDataExtractor.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService = null,
public TypedDynamoDbStreamBatchProcessor(
IPowertoolsConfigurations powertoolsConfigurations,
IDeserializationService deserializationService = null,
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor = null)
{
_deserializationService = deserializationService ?? JsonDeserializationService.Instance;
_recordDataExtractor = recordDataExtractor ?? DynamoDbRecordDataExtractor.Instance;
}

/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class with custom deserialization service.
/// </summary>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService)
: this(PowertoolsConfigurations.Instance, deserializationService, null)
{
}

/// <summary>
/// Initializes a new instance of the TypedDynamoDbStreamBatchProcessor class with custom services.
/// </summary>
/// <param name="deserializationService">The deserialization service. If null, uses JsonDeserializationService.Instance.</param>
/// <param name="recordDataExtractor">The record data extractor. If null, uses DynamoDbRecordDataExtractor.Instance.</param>
public TypedDynamoDbStreamBatchProcessor(IDeserializationService deserializationService,
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor)
: this(PowertoolsConfigurations.Instance, deserializationService, recordDataExtractor)
{
}

/// <summary>
/// Default constructor for when consumers create a custom typed batch processor.
/// </summary>
public TypedDynamoDbStreamBatchProcessor() : this(PowertoolsConfigurations.Instance)
{
}

/// <inheritdoc />
public async Task<ProcessingResult<DynamoDBEvent.DynamodbStreamRecord>> ProcessAsync<T>(
DynamoDBEvent @event,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

using System;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Amazon.Lambda.Core;

namespace AWS.Lambda.Powertools.BatchProcessing.Internal;

internal class TypedBatchProcessingAspectHandler<TEvent, TRecord> : IBatchProcessingAspectHandler
{
private readonly ITypedBatchProcessor<TEvent, TRecord> _typedBatchProcessor;
private readonly object _typedHandler;
private readonly bool _hasContext;
private readonly DeserializationOptions _deserializationOptions;
private readonly ProcessingOptions _processingOptions;

public TypedBatchProcessingAspectHandler(
ITypedBatchProcessor<TEvent, TRecord> typedBatchProcessor,
object typedHandler,
bool hasContext,
DeserializationOptions deserializationOptions,
ProcessingOptions processingOptions)
{
_typedBatchProcessor = typedBatchProcessor;
_typedHandler = typedHandler;
_hasContext = hasContext;
_deserializationOptions = deserializationOptions;
_processingOptions = processingOptions;
}

public async Task HandleAsync(object[] args)
{
// Try get event from args
if (args?.FirstOrDefault() is not TEvent @event)
{
throw new InvalidOperationException($"The first function handler parameter must be of type: '{typeof(TEvent).Namespace}'.");
}

// Get Lambda context if available and needed
ILambdaContext context = null;
if (_hasContext && args.Length > 1 && args[1] is ILambdaContext lambdaContext)
{
context = lambdaContext;
}

// Use reflection to call the appropriate ProcessAsync method on the typed batch processor
await CallTypedProcessAsync(@event, context);
}

private async Task CallTypedProcessAsync(TEvent @event, ILambdaContext context)
{
// Get the generic type argument from the handler
var handlerType = _typedHandler.GetType();
var handlerInterface = handlerType.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType &&
(i.GetGenericTypeDefinition() == typeof(ITypedRecordHandler<>) ||
i.GetGenericTypeDefinition() == typeof(ITypedRecordHandlerWithContext<>)));

if (handlerInterface == null)
{
throw new InvalidOperationException($"Handler type '{handlerType.Name}' does not implement ITypedRecordHandler<T> or ITypedRecordHandlerWithContext<T>.");
}

var dataType = handlerInterface.GetGenericArguments()[0];

// Find the appropriate ProcessAsync method on the typed batch processor
MethodInfo processMethod;
if (_hasContext && context != null)
{
// Look for ProcessAsync<T>(TEvent, ITypedRecordHandlerWithContext<T>, ILambdaContext, DeserializationOptions, ProcessingOptions)
processMethod = _typedBatchProcessor.GetType().GetMethods()
.FirstOrDefault(m => m.Name == "ProcessAsync" &&
m.IsGenericMethodDefinition &&
m.GetParameters().Length == 5 &&
m.GetParameters()[1].ParameterType.IsGenericType &&
m.GetParameters()[1].ParameterType.GetGenericTypeDefinition() == typeof(ITypedRecordHandlerWithContext<>) &&
m.GetParameters()[2].ParameterType == typeof(ILambdaContext) &&
m.GetParameters()[3].ParameterType == typeof(DeserializationOptions) &&
m.GetParameters()[4].ParameterType == typeof(ProcessingOptions));
}
else
{
// Look for ProcessAsync<T>(TEvent, ITypedRecordHandler<T>, DeserializationOptions, ProcessingOptions)
processMethod = _typedBatchProcessor.GetType().GetMethods()
.FirstOrDefault(m => m.Name == "ProcessAsync" &&
m.IsGenericMethodDefinition &&
m.GetParameters().Length == 4 &&
m.GetParameters()[1].ParameterType.IsGenericType &&
m.GetParameters()[1].ParameterType.GetGenericTypeDefinition() == typeof(ITypedRecordHandler<>) &&
m.GetParameters()[2].ParameterType == typeof(DeserializationOptions) &&
m.GetParameters()[3].ParameterType == typeof(ProcessingOptions));
}

if (processMethod == null)
{
throw new InvalidOperationException($"Could not find appropriate ProcessAsync method on typed batch processor for handler type '{handlerType.Name}'.");
}

// Make the method generic with the data type
var genericProcessMethod = processMethod.MakeGenericMethod(dataType);

// Call the method
Task processTask;
if (_hasContext && context != null)
{
processTask = (Task)genericProcessMethod.Invoke(_typedBatchProcessor, new object[] { @event, _typedHandler, context, _deserializationOptions, _processingOptions });
}
else
{
processTask = (Task)genericProcessMethod.Invoke(_typedBatchProcessor, new object[] { @event, _typedHandler, _deserializationOptions, _processingOptions });
}

await processTask;
}
}
Loading
Loading