99import com .gruelbox .transactionoutbox .spi .Utils ;
1010import java .lang .reflect .InvocationTargetException ;
1111import java .time .*;
12- import java .util .*;
12+ import java .util .ArrayList ;
13+ import java .util .List ;
14+ import java .util .UUID ;
1315import java .util .concurrent .CompletableFuture ;
1416import java .util .concurrent .Executor ;
1517import java .util .concurrent .atomic .AtomicBoolean ;
1618import java .util .concurrent .atomic .AtomicReference ;
1719import java .util .function .Supplier ;
20+ import lombok .AccessLevel ;
21+ import lombok .RequiredArgsConstructor ;
22+ import lombok .Setter ;
1823import lombok .ToString ;
24+ import lombok .experimental .Accessors ;
1925import lombok .extern .slf4j .Slf4j ;
2026import org .slf4j .MDC ;
2127import org .slf4j .event .Level ;
2228
2329@ Slf4j
24- class TransactionOutboxImpl implements TransactionOutbox , Validatable {
25-
26- private static final int DEFAULT_FLUSH_BATCH_SIZE = 4096 ;
30+ @ RequiredArgsConstructor (access = AccessLevel .PRIVATE )
31+ final class TransactionOutboxImpl implements TransactionOutbox , Validatable {
2732
2833 private final TransactionManager transactionManager ;
2934 private final Persistor persistor ;
@@ -41,39 +46,6 @@ class TransactionOutboxImpl implements TransactionOutbox, Validatable {
4146 private final AtomicBoolean initialized = new AtomicBoolean ();
4247 private final ProxyFactory proxyFactory = new ProxyFactory ();
4348
44- private TransactionOutboxImpl (
45- TransactionManager transactionManager ,
46- Instantiator instantiator ,
47- Submitter submitter ,
48- Duration attemptFrequency ,
49- int blockAfterAttempts ,
50- int flushBatchSize ,
51- Supplier <Clock > clockProvider ,
52- TransactionOutboxListener listener ,
53- Persistor persistor ,
54- Level logLevelTemporaryFailure ,
55- Boolean serializeMdc ,
56- Duration retentionThreshold ,
57- Boolean initializeImmediately ) {
58- this .transactionManager = transactionManager ;
59- this .instantiator = Utils .firstNonNull (instantiator , Instantiator ::usingReflection );
60- this .persistor = persistor ;
61- this .submitter = Utils .firstNonNull (submitter , Submitter ::withDefaultExecutor );
62- this .attemptFrequency = Utils .firstNonNull (attemptFrequency , () -> Duration .of (2 , MINUTES ));
63- this .blockAfterAttempts = blockAfterAttempts < 1 ? 5 : blockAfterAttempts ;
64- this .flushBatchSize = flushBatchSize < 1 ? DEFAULT_FLUSH_BATCH_SIZE : flushBatchSize ;
65- this .clockProvider = clockProvider == null ? Clock ::systemDefaultZone : clockProvider ;
66- this .listener = Utils .firstNonNull (listener , () -> new TransactionOutboxListener () {});
67- this .logLevelTemporaryFailure = Utils .firstNonNull (logLevelTemporaryFailure , () -> Level .WARN );
68- this .validator = new Validator (this .clockProvider );
69- this .serializeMdc = serializeMdc == null || serializeMdc ;
70- this .retentionThreshold = retentionThreshold == null ? Duration .ofDays (7 ) : retentionThreshold ;
71- this .validator .validate (this );
72- if (initializeImmediately == null || initializeImmediately ) {
73- initialize ();
74- }
75- }
76-
7749 @ Override
7850 public void validate (Validator validator ) {
7951 validator .notNull ("transactionManager" , transactionManager );
@@ -424,10 +396,7 @@ private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
424396 entry .setAttempts (entry .getAttempts () + 1 );
425397 var blocked = (entry .getTopic () == null ) && (entry .getAttempts () >= blockAfterAttempts );
426398 entry .setBlocked (blocked );
427- entry .setLastAttemptTime (Instant .now (clockProvider .get ()));
428- entry .setNextAttemptTime (after (attemptFrequency ));
429- validator .validate (entry );
430- transactionManager .inTransactionThrows (transaction -> persistor .update (transaction , entry ));
399+ transactionManager .inTransactionThrows (tx -> pushBack (tx , entry ));
431400 listener .failure (entry , cause );
432401 if (blocked ) {
433402 log .error (
@@ -462,46 +431,43 @@ static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder {
462431 }
463432
464433 public TransactionOutboxImpl build () {
465- return new TransactionOutboxImpl (
466- transactionManager ,
467- instantiator ,
468- submitter ,
469- attemptFrequency ,
470- blockAfterAttempts ,
471- flushBatchSize ,
472- clockProvider ,
473- listener ,
474- persistor ,
475- logLevelTemporaryFailure ,
476- serializeMdc ,
477- retentionThreshold ,
478- initializeImmediately );
434+ Validator validator = new Validator (this .clockProvider );
435+ TransactionOutboxImpl impl =
436+ new TransactionOutboxImpl (
437+ transactionManager ,
438+ persistor ,
439+ Utils .firstNonNull (instantiator , Instantiator ::usingReflection ),
440+ Utils .firstNonNull (submitter , Submitter ::withDefaultExecutor ),
441+ Utils .firstNonNull (attemptFrequency , () -> Duration .of (2 , MINUTES )),
442+ Utils .firstNonNull (logLevelTemporaryFailure , () -> Level .WARN ),
443+ blockAfterAttempts < 1 ? 5 : blockAfterAttempts ,
444+ flushBatchSize < 1 ? 4096 : flushBatchSize ,
445+ clockProvider == null ? Clock ::systemDefaultZone : clockProvider ,
446+ Utils .firstNonNull (listener , () -> TransactionOutboxListener .EMPTY ),
447+ serializeMdc == null || serializeMdc ,
448+ validator ,
449+ retentionThreshold == null ? Duration .ofDays (7 ) : retentionThreshold );
450+ validator .validate (impl );
451+ if (initializeImmediately == null || initializeImmediately ) {
452+ impl .initialize ();
453+ }
454+ return impl ;
479455 }
480456 }
481457
458+ @ Accessors (fluent = true , chain = true )
459+ @ Setter
482460 private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleBuilder {
483461
484462 private String uniqueRequestId ;
485- private String topic ;
486-
487- @ Override
488- public ParameterizedScheduleBuilder uniqueRequestId (String uniqueRequestId ) {
489- this .uniqueRequestId = uniqueRequestId ;
490- return this ;
491- }
492-
493- @ Override
494- public ParameterizedScheduleBuilder ordered (String topic ) {
495- this .topic = topic ;
496- return this ;
497- }
463+ private String ordered ;
498464
499465 @ Override
500466 public <T > T schedule (Class <T > clazz ) {
501467 if (uniqueRequestId != null && uniqueRequestId .length () > 250 ) {
502468 throw new IllegalArgumentException ("uniqueRequestId may be up to 250 characters" );
503469 }
504- return TransactionOutboxImpl .this .schedule (clazz , uniqueRequestId , topic );
470+ return TransactionOutboxImpl .this .schedule (clazz , uniqueRequestId , ordered );
505471 }
506472 }
507473}
0 commit comments