Skip to content

Commit 9baf787

Browse files
committed
remove duplication
1 parent 7e0ee4c commit 9baf787

File tree

5 files changed

+221
-259
lines changed

5 files changed

+221
-259
lines changed

libraries/src/AWS.Lambda.Powertools.BatchProcessing/DynamoDb/TypedDynamoDbStreamBatchProcessor.cs

Lines changed: 18 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -160,123 +160,57 @@ protected TypedDynamoDbStreamBatchProcessor() : this(PowertoolsConfigurations.In
160160
/// <summary>
161161
/// Wrapper class that adapts ITypedRecordHandler to IRecordHandler.
162162
/// </summary>
163-
private sealed class TypedRecordHandlerWrapper<T> : IRecordHandler<DynamoDBEvent.DynamodbStreamRecord>
163+
private sealed class TypedRecordHandlerWrapper<T> : TypedRecordHandlerWrapperBase<DynamoDBEvent.DynamodbStreamRecord, T>
164164
{
165165
private readonly ITypedRecordHandler<T> _typedHandler;
166-
private readonly IDeserializationService _deserializationService;
167-
private readonly IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> _recordDataExtractor;
168-
private readonly DeserializationOptions _deserializationOptions;
169166

170167
public TypedRecordHandlerWrapper(
171168
ITypedRecordHandler<T> typedHandler,
172169
IDeserializationService deserializationService,
173170
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor,
174171
DeserializationOptions deserializationOptions)
172+
: base(deserializationService, recordDataExtractor, deserializationOptions)
175173
{
176174
_typedHandler = typedHandler ?? throw new ArgumentNullException(nameof(typedHandler));
177-
_deserializationService = deserializationService ?? throw new ArgumentNullException(nameof(deserializationService));
178-
_recordDataExtractor = recordDataExtractor ?? throw new ArgumentNullException(nameof(recordDataExtractor));
179-
_deserializationOptions = deserializationOptions;
180175
}
181176

182-
public async Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
177+
protected override async Task<RecordHandlerResult> HandleTypedRecordAsync(T deserializedData, CancellationToken cancellationToken)
183178
{
184-
try
185-
{
186-
var recordData = _recordDataExtractor.ExtractData(record);
187-
188-
// Use TryDeserialize to check if deserialization was successful
189-
if (_deserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord ||
190-
_deserializationOptions?.IgnoreDeserializationErrors == true)
191-
{
192-
if (!_deserializationService.TryDeserialize<T>(recordData, out var deserializedData, out _, _deserializationOptions))
193-
{
194-
// Deserialization failed and we're ignoring errors, don't call the handler
195-
return RecordHandlerResult.None;
196-
}
197-
return await _typedHandler.HandleAsync(deserializedData, cancellationToken);
198-
}
199-
else
200-
{
201-
// Use regular deserialize which will throw on errors
202-
var deserializedData = _deserializationService.Deserialize<T>(recordData, _deserializationOptions);
203-
return await _typedHandler.HandleAsync(deserializedData, cancellationToken);
204-
}
205-
}
206-
catch (DeserializationException ex)
207-
{
208-
// Handle deserialization errors based on policy
209-
if (_deserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord)
210-
{
211-
return RecordHandlerResult.None;
212-
}
213-
214-
// For FailRecord policy or default, re-throw the exception
215-
throw new RecordProcessingException($"Failed to deserialize DynamoDB stream record '{record.Dynamodb.SequenceNumber}' to type '{typeof(T).Name}'. See inner exception for details.", ex);
216-
}
179+
return await _typedHandler.HandleAsync(deserializedData, cancellationToken);
180+
}
181+
182+
protected override string GetDeserializationErrorMessage(DynamoDBEvent.DynamodbStreamRecord record, DeserializationException ex)
183+
{
184+
return $"Failed to deserialize DynamoDB stream record '{record.Dynamodb.SequenceNumber}' to type '{typeof(T).Name}'. See inner exception for details.";
217185
}
218186
}
219187

220188
/// <summary>
221189
/// Wrapper class that adapts ITypedRecordHandlerWithContext to IRecordHandler.
222190
/// </summary>
223-
private sealed class TypedRecordHandlerWithContextWrapper<T> : IRecordHandler<DynamoDBEvent.DynamodbStreamRecord>
191+
private sealed class TypedRecordHandlerWithContextWrapper<T> : TypedRecordHandlerWithContextWrapperBase<DynamoDBEvent.DynamodbStreamRecord, T>
224192
{
225193
private readonly ITypedRecordHandlerWithContext<T> _typedHandler;
226-
private readonly ILambdaContext _context;
227-
private readonly IDeserializationService _deserializationService;
228-
private readonly IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> _recordDataExtractor;
229-
private readonly DeserializationOptions _deserializationOptions;
230194

231195
public TypedRecordHandlerWithContextWrapper(
232196
ITypedRecordHandlerWithContext<T> typedHandler,
233197
ILambdaContext context,
234198
IDeserializationService deserializationService,
235199
IRecordDataExtractor<DynamoDBEvent.DynamodbStreamRecord> recordDataExtractor,
236200
DeserializationOptions deserializationOptions)
201+
: base(context, deserializationService, recordDataExtractor, deserializationOptions)
237202
{
238203
_typedHandler = typedHandler ?? throw new ArgumentNullException(nameof(typedHandler));
239-
_context = context; // Context can be null
240-
_deserializationService = deserializationService ?? throw new ArgumentNullException(nameof(deserializationService));
241-
_recordDataExtractor = recordDataExtractor ?? throw new ArgumentNullException(nameof(recordDataExtractor));
242-
_deserializationOptions = deserializationOptions;
243204
}
244205

245-
public async Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
206+
protected override async Task<RecordHandlerResult> HandleTypedRecordWithContextAsync(T deserializedData, ILambdaContext context, CancellationToken cancellationToken)
207+
{
208+
return await _typedHandler.HandleAsync(deserializedData, context, cancellationToken);
209+
}
210+
211+
protected override string GetDeserializationErrorMessage(DynamoDBEvent.DynamodbStreamRecord record, DeserializationException ex)
246212
{
247-
try
248-
{
249-
var recordData = _recordDataExtractor.ExtractData(record);
250-
251-
// Use TryDeserialize to check if deserialization was successful
252-
if (_deserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord ||
253-
_deserializationOptions?.IgnoreDeserializationErrors == true)
254-
{
255-
if (!_deserializationService.TryDeserialize<T>(recordData, out var deserializedData, out _, _deserializationOptions))
256-
{
257-
// Deserialization failed and we're ignoring errors, don't call the handler
258-
return RecordHandlerResult.None;
259-
}
260-
return await _typedHandler.HandleAsync(deserializedData, _context, cancellationToken);
261-
}
262-
else
263-
{
264-
// Use regular deserialize which will throw on errors
265-
var deserializedData = _deserializationService.Deserialize<T>(recordData, _deserializationOptions);
266-
return await _typedHandler.HandleAsync(deserializedData, _context, cancellationToken);
267-
}
268-
}
269-
catch (DeserializationException ex)
270-
{
271-
// Handle deserialization errors based on policy
272-
if (_deserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord)
273-
{
274-
return RecordHandlerResult.None;
275-
}
276-
277-
// For FailRecord policy or default, re-throw the exception
278-
throw new RecordProcessingException($"Failed to deserialize DynamoDB stream record '{record.Dynamodb.SequenceNumber}' to type '{typeof(T).Name}'. See inner exception for details.", ex);
279-
}
213+
return $"Failed to deserialize DynamoDB stream record '{record.Dynamodb.SequenceNumber}' to type '{typeof(T).Name}'. See inner exception for details.";
280214
}
281215
}
282216
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Amazon.Lambda.Core;
5+
using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
6+
7+
namespace AWS.Lambda.Powertools.BatchProcessing.Internal;
8+
9+
/// <summary>
10+
/// Base wrapper class that adapts ITypedRecordHandlerWithContext to IRecordHandler with common deserialization logic.
11+
/// </summary>
12+
/// <typeparam name="TRecord">The type of the record being processed.</typeparam>
13+
/// <typeparam name="T">The type to deserialize the record data to.</typeparam>
14+
internal abstract class TypedRecordHandlerWithContextWrapperBase<TRecord, T> : IRecordHandler<TRecord>
15+
{
16+
protected readonly ILambdaContext Context;
17+
protected readonly IDeserializationService DeserializationService;
18+
protected readonly IRecordDataExtractor<TRecord> RecordDataExtractor;
19+
protected readonly DeserializationOptions DeserializationOptions;
20+
21+
protected TypedRecordHandlerWithContextWrapperBase(
22+
ILambdaContext context,
23+
IDeserializationService deserializationService,
24+
IRecordDataExtractor<TRecord> recordDataExtractor,
25+
DeserializationOptions deserializationOptions)
26+
{
27+
Context = context; // Context can be null
28+
DeserializationService = deserializationService ?? throw new ArgumentNullException(nameof(deserializationService));
29+
RecordDataExtractor = recordDataExtractor ?? throw new ArgumentNullException(nameof(recordDataExtractor));
30+
DeserializationOptions = deserializationOptions;
31+
}
32+
33+
public async Task<RecordHandlerResult> HandleAsync(TRecord record, CancellationToken cancellationToken)
34+
{
35+
try
36+
{
37+
var recordData = RecordDataExtractor.ExtractData(record);
38+
39+
// Use TryDeserialize to check if deserialization was successful
40+
if (DeserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord ||
41+
DeserializationOptions?.IgnoreDeserializationErrors == true)
42+
{
43+
if (!DeserializationService.TryDeserialize<T>(recordData, out var deserializedData, out _, DeserializationOptions))
44+
{
45+
// Deserialization failed and we're ignoring errors, don't call the handler
46+
return RecordHandlerResult.None;
47+
}
48+
return await HandleTypedRecordWithContextAsync(deserializedData, Context, cancellationToken);
49+
}
50+
else
51+
{
52+
// Use regular deserialize which will throw on errors
53+
var deserializedData = DeserializationService.Deserialize<T>(recordData, DeserializationOptions);
54+
return await HandleTypedRecordWithContextAsync(deserializedData, Context, cancellationToken);
55+
}
56+
}
57+
catch (DeserializationException ex)
58+
{
59+
// Handle deserialization errors based on policy
60+
if (DeserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord)
61+
{
62+
return RecordHandlerResult.None;
63+
}
64+
65+
// For FailRecord policy or default, re-throw the exception
66+
throw new RecordProcessingException(GetDeserializationErrorMessage(record, ex), ex);
67+
}
68+
}
69+
70+
/// <summary>
71+
/// Handles the typed record with context after successful deserialization.
72+
/// </summary>
73+
/// <param name="deserializedData">The deserialized data.</param>
74+
/// <param name="context">The Lambda context.</param>
75+
/// <param name="cancellationToken">The cancellation token.</param>
76+
/// <returns>The result of handling the record.</returns>
77+
protected abstract Task<RecordHandlerResult> HandleTypedRecordWithContextAsync(T deserializedData, ILambdaContext context, CancellationToken cancellationToken);
78+
79+
/// <summary>
80+
/// Gets the error message for deserialization failures.
81+
/// </summary>
82+
/// <param name="record">The record that failed to deserialize.</param>
83+
/// <param name="ex">The deserialization exception.</param>
84+
/// <returns>The error message.</returns>
85+
protected abstract string GetDeserializationErrorMessage(TRecord record, DeserializationException ex);
86+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using AWS.Lambda.Powertools.BatchProcessing.Exceptions;
5+
6+
namespace AWS.Lambda.Powertools.BatchProcessing.Internal;
7+
8+
/// <summary>
9+
/// Base wrapper class that adapts ITypedRecordHandler to IRecordHandler with common deserialization logic.
10+
/// </summary>
11+
/// <typeparam name="TRecord">The type of the record being processed.</typeparam>
12+
/// <typeparam name="T">The type to deserialize the record data to.</typeparam>
13+
internal abstract class TypedRecordHandlerWrapperBase<TRecord, T> : IRecordHandler<TRecord>
14+
{
15+
protected readonly IDeserializationService DeserializationService;
16+
protected readonly IRecordDataExtractor<TRecord> RecordDataExtractor;
17+
protected readonly DeserializationOptions DeserializationOptions;
18+
19+
protected TypedRecordHandlerWrapperBase(
20+
IDeserializationService deserializationService,
21+
IRecordDataExtractor<TRecord> recordDataExtractor,
22+
DeserializationOptions deserializationOptions)
23+
{
24+
DeserializationService = deserializationService ?? throw new ArgumentNullException(nameof(deserializationService));
25+
RecordDataExtractor = recordDataExtractor ?? throw new ArgumentNullException(nameof(recordDataExtractor));
26+
DeserializationOptions = deserializationOptions;
27+
}
28+
29+
public async Task<RecordHandlerResult> HandleAsync(TRecord record, CancellationToken cancellationToken)
30+
{
31+
try
32+
{
33+
var recordData = RecordDataExtractor.ExtractData(record);
34+
35+
// Use TryDeserialize to check if deserialization was successful
36+
if (DeserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord ||
37+
DeserializationOptions?.IgnoreDeserializationErrors == true)
38+
{
39+
if (!DeserializationService.TryDeserialize<T>(recordData, out var deserializedData, out _, DeserializationOptions))
40+
{
41+
// Deserialization failed and we're ignoring errors, don't call the handler
42+
return RecordHandlerResult.None;
43+
}
44+
return await HandleTypedRecordAsync(deserializedData, cancellationToken);
45+
}
46+
else
47+
{
48+
// Use regular deserialize which will throw on errors
49+
var deserializedData = DeserializationService.Deserialize<T>(recordData, DeserializationOptions);
50+
return await HandleTypedRecordAsync(deserializedData, cancellationToken);
51+
}
52+
}
53+
catch (DeserializationException ex)
54+
{
55+
// Handle deserialization errors based on policy
56+
if (DeserializationOptions?.ErrorPolicy == DeserializationErrorPolicy.IgnoreRecord)
57+
{
58+
return RecordHandlerResult.None;
59+
}
60+
61+
// For FailRecord policy or default, re-throw the exception
62+
throw new RecordProcessingException(GetDeserializationErrorMessage(record, ex), ex);
63+
}
64+
}
65+
66+
/// <summary>
67+
/// Handles the typed record after successful deserialization.
68+
/// </summary>
69+
/// <param name="deserializedData">The deserialized data.</param>
70+
/// <param name="cancellationToken">The cancellation token.</param>
71+
/// <returns>The result of handling the record.</returns>
72+
protected abstract Task<RecordHandlerResult> HandleTypedRecordAsync(T deserializedData, CancellationToken cancellationToken);
73+
74+
/// <summary>
75+
/// Gets the error message for deserialization failures.
76+
/// </summary>
77+
/// <param name="record">The record that failed to deserialize.</param>
78+
/// <param name="ex">The deserialization exception.</param>
79+
/// <returns>The error message.</returns>
80+
protected abstract string GetDeserializationErrorMessage(TRecord record, DeserializationException ex);
81+
}

0 commit comments

Comments
 (0)