2121import java .util .Arrays ;
2222import java .util .List ;
2323import java .util .UUID ;
24- import java .util .stream . Collectors ;
24+ import java .util .function . Consumer ;
2525
2626import javax .annotation .Nullable ;
2727
4040import software .amazon .kinesis .common .InitialPositionInStreamExtended ;
4141import software .amazon .kinesis .common .StreamConfig ;
4242import software .amazon .kinesis .common .StreamIdentifier ;
43+ import software .amazon .kinesis .coordinator .CoordinatorConfig ;
4344import software .amazon .kinesis .coordinator .Scheduler ;
4445import software .amazon .kinesis .exceptions .InvalidStateException ;
4546import software .amazon .kinesis .exceptions .ShutdownException ;
4647import software .amazon .kinesis .exceptions .ThrottlingException ;
48+ import software .amazon .kinesis .leases .LeaseManagementConfig ;
4749import software .amazon .kinesis .lifecycle .LifecycleConfig ;
4850import software .amazon .kinesis .lifecycle .events .InitializationInput ;
4951import software .amazon .kinesis .lifecycle .events .LeaseLostInput ;
5557import software .amazon .kinesis .metrics .NullMetricsFactory ;
5658import software .amazon .kinesis .processor .FormerStreamsLeasesDeletionStrategy ;
5759import software .amazon .kinesis .processor .MultiStreamTracker ;
60+ import software .amazon .kinesis .processor .ProcessorConfig ;
5861import software .amazon .kinesis .processor .RecordProcessorCheckpointer ;
5962import software .amazon .kinesis .processor .ShardRecordProcessor ;
6063import software .amazon .kinesis .processor .ShardRecordProcessorFactory ;
@@ -151,6 +154,20 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
151154
152155 private MetricsLevel metricsLevel = MetricsLevel .DETAILED ;
153156
157+ private Consumer <CoordinatorConfig > coordinatorConfigCustomizer = (config ) -> {
158+ };
159+
160+ private Consumer <LifecycleConfig > lifecycleConfigCustomizer = (config ) -> {
161+ };
162+
163+ private Consumer <MetricsConfig > metricsConfigCustomizer = (config ) -> {
164+ };
165+
166+ private Consumer <LeaseManagementConfig > leaseManagementConfigCustomizer = (config ) -> {
167+ };
168+
169+ private boolean emptyRecordList ;
170+
154171 public KclMessageDrivenChannelAdapter (String ... streams ) {
155172 this (KinesisAsyncClient .create (), CloudWatchAsyncClient .create (), DynamoDbAsyncClient .create (), streams );
156173 }
@@ -283,9 +300,70 @@ public void setMetricsLevel(MetricsLevel metricsLevel) {
283300 this .metricsLevel = metricsLevel ;
284301 }
285302
303+ /**
304+ * Set a {@link Consumer} to configure a {@link CoordinatorConfig}.
305+ * @param coordinatorConfigCustomizer the {@link Consumer} to configure a {@link CoordinatorConfig}.
306+ * @since 3.0.8
307+ * @see CoordinatorConfig
308+ */
309+ public void setCoordinatorConfigCustomizer (Consumer <CoordinatorConfig > coordinatorConfigCustomizer ) {
310+ Assert .notNull (coordinatorConfigCustomizer , "'coordinatorConfigCustomizer' must not be null" );
311+ this .coordinatorConfigCustomizer = coordinatorConfigCustomizer ;
312+ }
313+
314+ /**
315+ * Set a {@link Consumer} to configure a {@link LifecycleConfig}.
316+ * @param lifecycleConfigCustomizer the {@link Consumer} to configure a {@link LifecycleConfig}.
317+ * @since 3.0.8
318+ * @see LifecycleConfig
319+ */
320+ public void setLifecycleConfigCustomizer (Consumer <LifecycleConfig > lifecycleConfigCustomizer ) {
321+ Assert .notNull (lifecycleConfigCustomizer , "'lifecycleConfigCustomizer' must not be null" );
322+ this .lifecycleConfigCustomizer = lifecycleConfigCustomizer ;
323+ }
324+
325+ /**
326+ * Set a {@link Consumer} to configure a {@link MetricsConfig}.
327+ * May override whatever could be set individually, like {@link #setMetricsLevel(MetricsLevel)}.
328+ * @param metricsConfigCustomizer the {@link Consumer} to configure a {@link MetricsConfig}.
329+ * @since 3.0.8
330+ * @see MetricsConfig
331+ */
332+ public void setMetricsConfigCustomizer (Consumer <MetricsConfig > metricsConfigCustomizer ) {
333+ Assert .notNull (metricsConfigCustomizer , "'metricsConfigCustomizer' must not be null" );
334+ this .metricsConfigCustomizer = metricsConfigCustomizer ;
335+ }
336+
337+ /**
338+ * Set a {@link Consumer} to configure a {@link LeaseManagementConfig}.
339+ * @param leaseManagementConfigCustomizer the {@link Consumer} to configure a {@link LeaseManagementConfig}.
340+ * @since 3.0.8
341+ * @see LeaseManagementConfig
342+ */
343+ public void setLeaseManagementConfigCustomizer (Consumer <LeaseManagementConfig > leaseManagementConfigCustomizer ) {
344+ Assert .notNull (leaseManagementConfigCustomizer , "'leaseManagementConfigCustomizer' must not be null" );
345+ this .leaseManagementConfigCustomizer = leaseManagementConfigCustomizer ;
346+ }
347+
348+ /**
349+ * Whether to return an empty record list from consumer to the processor.
350+ * Works only in {@link ListenerMode#batch} mode.
351+ * The message will be sent into the output channel with an empty {@link List} as a payload.
352+ * @param emptyRecordList true to return an empty record list.
353+ * @since 3.0.8
354+ * @see ProcessorConfig#callProcessRecordsEvenForEmptyRecordList(boolean)
355+ */
356+ public void setEmptyRecordList (boolean emptyRecordList ) {
357+ this .emptyRecordList = emptyRecordList ;
358+ }
359+
286360 @ Override
287361 protected void onInit () {
288362 super .onInit ();
363+ if (this .listenerMode .equals (ListenerMode .record ) && this .emptyRecordList ) {
364+ this .emptyRecordList = false ;
365+ logger .warn ("The 'emptyRecordList' is processed only in the [ListenerMode.batch]." );
366+ }
289367 this .config =
290368 new ConfigsBuilder (buildStreamTracker (),
291369 this .consumerGroup ,
@@ -316,8 +394,9 @@ protected void doStart() {
316394 + "because it does not make sense in case of [ListenerMode.batch]." );
317395 }
318396
319- LifecycleConfig lifecycleConfig = this .config .lifecycleConfig ();
397+ LifecycleConfig lifecycleConfig = this .config .lifecycleConfig ();
320398 lifecycleConfig .taskBackoffTimeMillis (this .consumerBackoff );
399+ this .lifecycleConfigCustomizer .accept (lifecycleConfig );
321400
322401 RetrievalSpecificConfig retrievalSpecificConfig ;
323402 String singleStreamName = this .streams .length == 1 ? this .streams [0 ] : null ;
@@ -342,15 +421,25 @@ protected void doStart() {
342421 if (MetricsLevel .NONE .equals (this .metricsLevel )) {
343422 metricsConfig .metricsFactory (new NullMetricsFactory ());
344423 }
424+ this .metricsConfigCustomizer .accept (metricsConfig );
425+
426+ CoordinatorConfig coordinatorConfig = this .config .coordinatorConfig ();
427+ this .coordinatorConfigCustomizer .accept (coordinatorConfig );
428+
429+ LeaseManagementConfig leaseManagementConfig = this .config .leaseManagementConfig ();
430+ this .leaseManagementConfigCustomizer .accept (leaseManagementConfig );
431+
432+ ProcessorConfig processorConfig = this .config .processorConfig ()
433+ .callProcessRecordsEvenForEmptyRecordList (this .emptyRecordList );
345434
346435 this .scheduler =
347436 new Scheduler (
348437 this .config .checkpointConfig (),
349- this . config . coordinatorConfig () ,
350- this . config . leaseManagementConfig () ,
438+ coordinatorConfig ,
439+ leaseManagementConfig ,
351440 lifecycleConfig ,
352441 metricsConfig ,
353- this . config . processorConfig () ,
442+ processorConfig ,
354443 retrievalConfig );
355444
356445 this .executor .execute (this .scheduler );
@@ -542,7 +631,7 @@ private void processMultipleRecords(List<KinesisClientRecord> records,
542631 records .stream ()
543632 .map (this ::prepareMessageForRecord )
544633 .map (AbstractIntegrationMessageBuilder ::build )
545- .collect ( Collectors . toList () );
634+ .toList ();
546635
547636 messageBuilder = getMessageBuilderFactory ().withPayload (payload );
548637 }
@@ -557,7 +646,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) {
557646
558647 return KclMessageDrivenChannelAdapter .this .converter .convert (r .data ().array ());
559648 })
560- .collect ( Collectors . toList () );
649+ .toList ();
561650
562651 messageBuilder = getMessageBuilderFactory ().withPayload (payload )
563652 .setHeader (AwsHeaders .RECEIVED_PARTITION_KEY , partitionKeys )
0 commit comments