Skip to content

Commit 4d39d26

Browse files
Merge pull request #588 from gruelbox/sequence-support
Support for ordered processing
2 parents a9b8c51 + d983747 commit 4d39d26

File tree

17 files changed

+678
-257
lines changed

17 files changed

+678
-257
lines changed

.github/workflows/cd_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
- name: Build, publish to GPR and tag
2626
run: |
2727
if [ "$GITHUB_REPOSITORY" == "gruelbox/transaction-outbox" ]; then
28-
revision="5.4.$GITHUB_RUN_NUMBER"
28+
revision="5.5.$GITHUB_RUN_NUMBER"
2929
echo "Building $revision at $GITHUB_SHA"
3030
mvn -Pconcise,delombok -B deploy -s $GITHUB_WORKSPACE/settings.xml -Drevision="$revision" -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
3131
echo "Tagging $revision"

README.md

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ A flexible implementation of the [Transaction Outbox Pattern](https://microservi
2525
1. [Set up the background worker](#set-up-the-background-worker)
2626
1. [Managing the "dead letter queue"](#managing-the-dead-letter-queue)
2727
1. [Advanced](#advanced)
28+
1. [Topics and FIFO ordering](#topics-and-fifo-ordering)
2829
1. [The nested outbox pattern](#the-nested-outbox-pattern)
2930
1. [Idempotency protection](#idempotency-protection)
3031
1. [Flexible serialization](#flexible-serialization-beta)
@@ -291,20 +292,68 @@ TransactionOutbox.builder()
291292

292293
To mark the work for reprocessing, just use [`TransactionOutbox.unblock()`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-core/latest/com/gruelbox/transactionoutbox/TransactionOutbox.html). Its failure count will be marked back down to zero and it will get reprocessed on the next call to `flush()`:
293294

294-
```
295+
```java
295296
transactionOutboxEntry.unblock(entryId);
296297
```
297298

298299
Or if using a `TransactionManager` that relies on explicit context (such as a non-thread local [`JooqTransactionManager`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-jooq/latest/com/gruelbox/transactionoutbox/JooqTransactionManager.html)):
299300

300-
```
301+
```java
301302
transactionOutboxEntry.unblock(entryId, context);
302303
```
303304

304305
A good approach here is to use the [`TransactionOutboxListener`](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-core/latest/com/gruelbox/transactionoutbox/TransactionOutboxListener.html) callback to post an [interactive Slack message](https://api.slack.com/legacy/interactive-messages) - this can operate as both the alert and the "button" allowing a support engineer to submit the work for reprocessing.
305306

306307
## Advanced
307308

309+
### Topics and FIFO ordering
310+
311+
For some applications, the order in which tasks are processed is important, such as when:
312+
313+
- using the outbox to write to a FIFO queue, Kafka or AWS Kinesis topic; or
314+
- data replication, e.g. when feeding a data warehouse or distributed cache.
315+
316+
In these scenarios, the default behaviour is unsuitable. Tasks are usually processed in a highly parallel fashion.
317+
Even if the volume of tasks is low, if a task fails and is retried later, it can easily end up processing after
318+
some later task even if that later task was processed hours or even days after the failing one.
319+
320+
To avoid problems associated with tasks being processed out-of-order, you can order the processing of your tasks
321+
within a named "topic":
322+
323+
```java
324+
outbox.with().ordered("topic1").schedule(Service.class).process("red");
325+
outbox.with().ordered("topic2").schedule(Service.class).process("green");
326+
outbox.with().ordered("topic1").schedule(Service.class).process("blue");
327+
outbox.with().ordered("topic2").schedule(Service.class).process("yellow");
328+
```
329+
330+
No matter what happens:
331+
332+
- `red` will always need to be processed (successfully) before `blue`;
333+
- `green` will always need to be processed (successfully) before `yellow`; but
334+
- `red` and `blue` can run in any sequence with respect to `green` and `yellow`.
335+
336+
This functionality was specifically designed to allow outboxed writing to Kafka topics. For maximum throughput
337+
when writing to Kafka, it is advised that you form your outbox topic name by combining the Kafka topic and partition,
338+
since that is the boundary where ordering is required.
339+
340+
There are a number of things to consider before using this feature:
341+
342+
- Tasks are not processed immediately when submitting, as normal, and are processed by
343+
background flushing only. This means there will be an increased delay between the source transaction being
344+
committed and the task being processed, depending on how your application calls `TransactionOutbox.flush()`.
345+
- If a task fails, no further requests will be processed _in that topic_ until
346+
a subsequent retry allows the failing task to succeed, to preserve ordered
347+
processing. This means it is possible for topics to become entirely frozen in the event
348+
that a task fails repeatedly. For this reason, it is essential to use a
349+
`TransactionOutboxListener` to watch for failing tasks and investigate quickly. Note
350+
that other topics will be unaffected.
351+
- `TransactionOutboxBuilder.blockAfterAttempts` is ignored for all tasks that use this
352+
option.
353+
- A single topic can only be processed in single-threaded fashion, but separate topics can be processed in
354+
parallel. If your tasks use a small number of topics, scalability will be affected since the degree of
355+
parallelism will be reduced.
356+
308357
### The nested-outbox pattern
309358

310359
In practice it can be extremely hard to guarantee that an entire unit of work is idempotent and thus suitable for retry. For example, the request might be to "update a customer record" with a new address, but this might record the change to an audit history table with a fresh UUID, the current date and time and so on, which in turn triggers external changes outside the transaction. The parent customer update request may be idempotent, but the downstream effects may not be.

transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Collection;
77
import java.util.Map;
88
import java.util.TreeMap;
9+
import java.util.function.BiFunction;
910
import java.util.function.Function;
1011
import java.util.stream.Stream;
1112
import lombok.AccessLevel;
@@ -41,6 +42,17 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep
4142
}
4243
}
4344

45+
@Override
46+
public String fetchAndLockNextInTopic(String fields, String table) {
47+
return String.format(
48+
"SELECT %s FROM %s"
49+
+ " WHERE topic = ?"
50+
+ " AND processed = %s"
51+
+ " ORDER BY seq ASC"
52+
+ " %s FOR UPDATE",
53+
fields, table, booleanValue(false), limitCriteria.replace("?", "1"));
54+
}
55+
4456
@Override
4557
public String toString() {
4658
return name;
@@ -63,6 +75,7 @@ static final class Builder {
6375
private Map<Integer, Migration> migrations;
6476
private Function<Boolean, String> booleanValueFrom;
6577
private SQLAction createVersionTableBy;
78+
private BiFunction<String, String, String> fetchAndLockNextInTopic;
6679

6780
Builder(String name) {
6881
this.name = name;
@@ -120,6 +133,27 @@ static final class Builder {
120133
8,
121134
"Update length of invocation column on outbox for MySQL dialects only.",
122135
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT"));
136+
migrations.put(
137+
9,
138+
new Migration(
139+
9,
140+
"Add topic",
141+
"ALTER TABLE TXNO_OUTBOX ADD COLUMN topic VARCHAR(250) NOT NULL DEFAULT '*'"));
142+
migrations.put(
143+
10,
144+
new Migration(10, "Add sequence", "ALTER TABLE TXNO_OUTBOX ADD COLUMN seq BIGINT NULL"));
145+
migrations.put(
146+
11,
147+
new Migration(
148+
11,
149+
"Add sequence table",
150+
"CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq BIGINT NOT NULL, PRIMARY KEY (topic, seq))"));
151+
migrations.put(
152+
12,
153+
new Migration(
154+
12,
155+
"Add flush index to support ordering",
156+
"CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)"));
123157
}
124158

125159
Builder setMigration(Migration migration) {
@@ -154,6 +188,14 @@ public void createVersionTableIfNotExists(Connection connection) throws SQLExcep
154188
super.createVersionTableIfNotExists(connection);
155189
}
156190
}
191+
192+
@Override
193+
public String fetchAndLockNextInTopic(String fields, String table) {
194+
if (fetchAndLockNextInTopic != null) {
195+
return fetchAndLockNextInTopic.apply(fields, table);
196+
}
197+
return super.fetchAndLockNextInTopic(fields, table);
198+
}
157199
};
158200
}
159201
}

0 commit comments

Comments
 (0)