Skip to content

Commit aee11a8

Browse files
authored
Fix test server publishing the Completion of child workflows before the Start sometimes (#1289)
Issue #1288
1 parent 462a3cf commit aee11a8

File tree

4 files changed

+30
-28
lines changed

4 files changed

+30
-28
lines changed

temporal-sdk/src/test/java/io/temporal/common/reporter/TestStatsReporter.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public final class TestStatsReporter implements StatsReporter {
3838
private final Map<String, Double> gauges = new HashMap<>();
3939
private final Map<String, StatsAccumulator> timers = new HashMap<>();
4040

41-
public void assertCounter(String name, Map<String, String> tags) {
41+
public synchronized void assertCounter(String name, Map<String, String> tags) {
4242
String metricName = getMetricName(name, tags);
4343
if (!counters.containsKey(metricName)) {
4444
fail(
@@ -49,7 +49,7 @@ public void assertCounter(String name, Map<String, String> tags) {
4949
}
5050
}
5151

52-
public void assertNoMetric(String name, Map<String, String> tags) {
52+
public synchronized void assertNoMetric(String name, Map<String, String> tags) {
5353
String metricName = getMetricName(name, tags);
5454
if (counters.containsKey(metricName)) {
5555
fail(
@@ -60,7 +60,7 @@ public void assertNoMetric(String name, Map<String, String> tags) {
6060
}
6161
}
6262

63-
public void assertCounter(String name, Map<String, String> tags, long expected) {
63+
public synchronized void assertCounter(String name, Map<String, String> tags, long expected) {
6464
String metricName = getMetricName(name, tags);
6565
AtomicLong accumulator = counters.get(metricName);
6666
if (accumulator == null) {
@@ -73,11 +73,12 @@ public void assertCounter(String name, Map<String, String> tags, long expected)
7373
assertEquals(String.valueOf(accumulator.get()), expected, accumulator.get());
7474
}
7575

76-
public void assertGauge(String name, Map<String, String> tags, double expected) {
76+
public synchronized void assertGauge(String name, Map<String, String> tags, double expected) {
7777
assertGauge(name, tags, val -> Math.abs(expected - val) < 1e-3);
7878
}
7979

80-
public void assertGauge(String name, Map<String, String> tags, Predicate<Double> isExpected) {
80+
public synchronized void assertGauge(
81+
String name, Map<String, String> tags, Predicate<Double> isExpected) {
8182
String metricName = getMetricName(name, tags);
8283
Double value = gauges.get(metricName);
8384
if (value == null) {
@@ -90,7 +91,7 @@ public void assertGauge(String name, Map<String, String> tags, Predicate<Double>
9091
assertTrue(String.valueOf(value), isExpected.test(value));
9192
}
9293

93-
public void assertTimer(String name, Map<String, String> tags) {
94+
public synchronized void assertTimer(String name, Map<String, String> tags) {
9495
String metricName = getMetricName(name, tags);
9596
if (!timers.containsKey(metricName)) {
9697
fail(
@@ -101,7 +102,8 @@ public void assertTimer(String name, Map<String, String> tags) {
101102
}
102103
}
103104

104-
public void assertTimerMinDuration(String name, Map<String, String> tags, Duration minDuration) {
105+
public synchronized void assertTimerMinDuration(
106+
String name, Map<String, String> tags, Duration minDuration) {
105107
String metricName = getMetricName(name, tags);
106108
StatsAccumulator value = timers.get(metricName);
107109
if (value == null) {
@@ -134,7 +136,7 @@ public synchronized void reportGauge(String name, Map<String, String> tags, doub
134136
}
135137

136138
@Override
137-
public void reportTimer(
139+
public synchronized void reportTimer(
138140
String name, Map<String, String> tags, com.uber.m3.util.Duration interval) {
139141
String metricName = getMetricName(name, tags);
140142
StatsAccumulator value = timers.get(metricName);
@@ -147,7 +149,7 @@ public void reportTimer(
147149

148150
@SuppressWarnings("deprecation")
149151
@Override
150-
public void reportHistogramValueSamples(
152+
public synchronized void reportHistogramValueSamples(
151153
String name,
152154
Map<String, String> tags,
153155
com.uber.m3.tally.Buckets buckets,
@@ -159,7 +161,7 @@ public void reportHistogramValueSamples(
159161

160162
@SuppressWarnings("deprecation")
161163
@Override
162-
public void reportHistogramDurationSamples(
164+
public synchronized void reportHistogramDurationSamples(
163165
String name,
164166
Map<String, String> tags,
165167
com.uber.m3.tally.Buckets buckets,

temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ public class ActivityTimeoutTest {
4343
private TestWorkflowEnvironment testEnvironment;
4444
private static final String TASK_QUEUE = "test-activities";
4545

46-
public @Rule Timeout timeout = Timeout.seconds(10);
46+
// TODO This test takes longer than it should to complete because
47+
// of the cached heartbeat that prevents a quit shutdown
48+
public @Rule Timeout timeout = Timeout.seconds(12);
4749

4850
@Before
4951
public void setUp() {
@@ -56,8 +58,6 @@ public void tearDown() {
5658
testEnvironment.close();
5759
}
5860

59-
// TODO This test takes longer than it should to complete because
60-
// of the cached heartbeat that prevents a quit shutdown
6161
@Test(timeout = 11_000)
6262
public void testActivityStartToCloseTimeout() {
6363
Worker worker = testEnvironment.newWorker(TASK_QUEUE);

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ <R> void action(Action action, RequestContext context, R request, long reference
226226
(TransitionDestination<Data, R>) transitions.get(transition);
227227
if (destination == null) {
228228
throw Status.INTERNAL
229-
.withDescription(this.data + "Invalid " + transition + ", history: " + transitionHistory)
229+
.withDescription(this.data + " Invalid " + transition + ", history: " + transitionHistory)
230230
.asRuntimeException();
231231
}
232232
state = destination.apply(context, data, request, referenceId);

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,20 +1544,20 @@ public void startWorkflow(
15441544
.setNamespace(getExecutionId().getNamespace())
15451545
.setWorkflowType(startRequest.getWorkflowType())
15461546
.build();
1547-
ForkJoinPool.commonPool()
1548-
.execute(
1549-
() -> {
1550-
try {
1551-
parent.get().childWorkflowStarted(a);
1552-
} catch (StatusRuntimeException e) {
1553-
// NOT_FOUND is expected as the parent might just close by now.
1554-
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
1555-
log.error("Failure reporting child completion", e);
1556-
}
1557-
} catch (Throwable e) {
1558-
log.error("Failure trying to add task for an delayed workflow retry", e);
1559-
}
1560-
});
1547+
1548+
// notifying the parent state machine in the same transaction and thread, otherwise the parent
1549+
// may see
1550+
// completion before start if it's done asynchronously.
1551+
try {
1552+
parent.get().childWorkflowStarted(a);
1553+
} catch (StatusRuntimeException e) {
1554+
// NOT_FOUND is expected as the parent might just close by now.
1555+
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
1556+
log.error("Failure reporting child completion", e);
1557+
}
1558+
} catch (Exception e) {
1559+
log.error("Failure trying to add task for an delayed workflow retry", e);
1560+
}
15611561
}
15621562
}
15631563

0 commit comments

Comments
 (0)