Skip to content

Commit fce5cb3

Browse files
[FLINK-37747][runtime] Use old subtask count for restored committable objects (apache#26518)
--------- Co-authored-by: David Wang <[email protected]>
1 parent ddd65bd commit fce5cb3

File tree

3 files changed

+183
-22
lines changed

3 files changed

+183
-22
lines changed

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,12 @@ private void commitAndEmit(CheckpointCommittableManager<CommT> committableManage
183183

184184
private void emit(CheckpointCommittableManager<CommT> committableManager) {
185185
int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
186-
int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
186+
// Ensure that numberOfSubtasks is in sync with the number of actually emitted
187+
// CommittableSummaries during upscaling recovery (see FLINK-37747).
188+
int numberOfSubtasks =
189+
Math.min(
190+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
191+
committableManager.getNumberOfSubtasks());
187192
long checkpointId = committableManager.getCheckpointId();
188193
Collection<CommT> committables = committableManager.getSuccessfulCommittables();
189194
output.collect(

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.assertj.core.api.ListAssert;
3737
import org.junit.jupiter.api.Test;
3838
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.CsvSource;
3940
import org.junit.jupiter.params.provider.ValueSource;
4041

4142
import java.util.Collection;
@@ -131,7 +132,7 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
131132
SinkAndCounters sinkAndCounters = sinkWithPostCommit();
132133
final OneInputStreamOperatorTestHarness<
133134
CommittableMessage<String>, CommittableMessage<String>>
134-
testHarness = createTestHarness(sinkAndCounters.sink, false, true);
135+
testHarness = createTestHarness(sinkAndCounters.sink, false, true, 1, 1, 0);
135136
testHarness.open();
136137
testHarness.setProcessingTime(0);
137138

@@ -164,11 +165,13 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
164165
testHarness.close();
165166
}
166167

167-
@Test
168-
void testStateRestore() throws Exception {
168+
@ParameterizedTest
169+
@CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"})
170+
void testStateRestoreWithScaling(
171+
int parallelismBeforeScaling, int parallelismAfterScaling, int subtaskIdAfterRecovery)
172+
throws Exception {
169173

170174
final int originalSubtaskId = 0;
171-
final int subtaskIdAfterRecovery = 9;
172175

173176
final OneInputStreamOperatorTestHarness<
174177
CommittableMessage<String>, CommittableMessage<String>>
@@ -177,8 +180,8 @@ void testStateRestore() throws Exception {
177180
sinkWithPostCommitWithRetry().sink,
178181
false,
179182
true,
180-
1,
181-
1,
183+
parallelismBeforeScaling,
184+
parallelismBeforeScaling,
182185
originalSubtaskId);
183186
testHarness.open();
184187

@@ -187,15 +190,17 @@ void testStateRestore() throws Exception {
187190
long checkpointId = 0L;
188191

189192
final CommittableSummary<String> committableSummary =
190-
new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 0);
193+
new CommittableSummary<>(
194+
originalSubtaskId, parallelismBeforeScaling, checkpointId, 1, 0);
191195
testHarness.processElement(new StreamRecord<>(committableSummary));
192196
final CommittableWithLineage<String> first =
193197
new CommittableWithLineage<>("1", checkpointId, originalSubtaskId);
194198
testHarness.processElement(new StreamRecord<>(first));
195199

196200
// another committable for the same checkpointId but from different subtask.
197201
final CommittableSummary<String> committableSummary2 =
198-
new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 0);
202+
new CommittableSummary<>(
203+
originalSubtaskId + 1, parallelismBeforeScaling, checkpointId, 1, 0);
199204
testHarness.processElement(new StreamRecord<>(committableSummary2));
200205
final CommittableWithLineage<String> second =
201206
new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1);
@@ -213,7 +218,12 @@ void testStateRestore() throws Exception {
213218
CommittableMessage<String>, CommittableMessage<String>>
214219
restoredHarness =
215220
createTestHarness(
216-
restored.sink, false, true, 10, 10, subtaskIdAfterRecovery);
221+
restored.sink,
222+
false,
223+
true,
224+
parallelismAfterScaling,
225+
parallelismAfterScaling,
226+
subtaskIdAfterRecovery);
217227

218228
restoredHarness.initializeState(snapshot);
219229
restoredHarness.open();
@@ -226,7 +236,9 @@ void testStateRestore() throws Exception {
226236
records.element(0, as(committableSummary()))
227237
.hasCheckpointId(checkpointId)
228238
.hasFailedCommittables(0)
229-
.hasSubtaskId(subtaskIdAfterRecovery);
239+
.hasSubtaskId(subtaskIdAfterRecovery)
240+
.hasNumberOfSubtasks(
241+
Math.min(parallelismBeforeScaling, parallelismAfterScaling));
230242
objectCommittableSummaryAssert.hasOverallCommittables(2);
231243

232244
// Expect the same checkpointId that the original snapshot was made with.
@@ -314,17 +326,6 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex
314326
}
315327
}
316328

317-
private OneInputStreamOperatorTestHarness<
318-
CommittableMessage<String>, CommittableMessage<String>>
319-
createTestHarness(
320-
SupportsCommitter<String> sink,
321-
boolean isBatchMode,
322-
boolean isCheckpointingEnabled)
323-
throws Exception {
324-
return new OneInputStreamOperatorTestHarness<>(
325-
new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled));
326-
}
327-
328329
private OneInputStreamOperatorTestHarness<
329330
CommittableMessage<String>, CommittableMessage<String>>
330331
createTestHarness(

flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,61 @@
1717

1818
package org.apache.flink.test.streaming.runtime;
1919

20+
import org.apache.flink.api.common.JobID;
2021
import org.apache.flink.api.common.RuntimeExecutionMode;
2122
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.common.functions.MapFunction;
24+
import org.apache.flink.api.common.state.CheckpointListener;
2225
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
2326
import org.apache.flink.api.connector.sink2.Committer;
2427
import org.apache.flink.api.connector.source.Source;
2528
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
2629
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
2730
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
31+
import org.apache.flink.client.program.ClusterClient;
32+
import org.apache.flink.configuration.CheckpointingOptions;
33+
import org.apache.flink.configuration.Configuration;
34+
import org.apache.flink.configuration.CoreOptions;
35+
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
36+
import org.apache.flink.configuration.RestartStrategyOptions;
37+
import org.apache.flink.configuration.StateBackendOptions;
38+
import org.apache.flink.configuration.StateRecoveryOptions;
2839
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
40+
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
41+
import org.apache.flink.runtime.minicluster.MiniCluster;
42+
import org.apache.flink.runtime.testutils.CommonTestUtils;
2943
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3044
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
3145
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
3246
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
47+
import org.apache.flink.test.junit5.InjectClusterClient;
48+
import org.apache.flink.test.junit5.InjectMiniCluster;
3349
import org.apache.flink.test.util.AbstractTestBase;
3450
import org.apache.flink.testutils.junit.SharedObjectsExtension;
3551
import org.apache.flink.testutils.junit.SharedReference;
3652

3753
import org.junit.jupiter.api.Test;
3854
import org.junit.jupiter.api.extension.RegisterExtension;
55+
import org.junit.jupiter.api.io.TempDir;
56+
import org.junit.jupiter.params.ParameterizedTest;
57+
import org.junit.jupiter.params.provider.CsvSource;
3958
import org.slf4j.Logger;
4059
import org.slf4j.LoggerFactory;
4160

61+
import java.io.File;
4262
import java.io.Serializable;
4363
import java.util.Arrays;
4464
import java.util.Collection;
4565
import java.util.Collections;
4666
import java.util.List;
67+
import java.util.Optional;
4768
import java.util.Queue;
4869
import java.util.concurrent.CompletionStage;
4970
import java.util.concurrent.ConcurrentLinkedQueue;
71+
import java.util.concurrent.ExecutionException;
72+
import java.util.concurrent.atomic.AtomicBoolean;
5073
import java.util.stream.Collectors;
74+
import java.util.stream.IntStream;
5175

5276
import static org.assertj.core.api.Assertions.assertThat;
5377

@@ -125,6 +149,49 @@ private static Record<Integer> flipValue(Record<Integer> r) {
125149
return r.withValue(-r.getValue());
126150
}
127151

152+
@ParameterizedTest
153+
@CsvSource({"1, 2", "2, 1", "1, 1"})
154+
public void writerAndCommitterExecuteInStreamingModeWithScaling(
155+
int initialParallelism,
156+
int scaledParallelism,
157+
@TempDir File checkpointDir,
158+
@InjectMiniCluster MiniCluster miniCluster,
159+
@InjectClusterClient ClusterClient<?> clusterClient)
160+
throws Exception {
161+
SharedReference<Queue<Committer.CommitRequest<Record<Integer>>>> committed =
162+
SHARED_OBJECTS.add(new ConcurrentLinkedQueue<>());
163+
final TrackingCommitter trackingCommitter = new TrackingCommitter(committed);
164+
final Configuration config = createConfigForScalingTest(checkpointDir, initialParallelism);
165+
166+
// first run
167+
final JobID jobID =
168+
runStreamingWithScalingTest(
169+
config,
170+
initialParallelism,
171+
trackingCommitter,
172+
true,
173+
miniCluster,
174+
clusterClient);
175+
176+
// second run
177+
config.set(StateRecoveryOptions.SAVEPOINT_PATH, getCheckpointPath(miniCluster, jobID));
178+
config.set(CoreOptions.DEFAULT_PARALLELISM, scaledParallelism);
179+
runStreamingWithScalingTest(
180+
config, initialParallelism, trackingCommitter, false, miniCluster, clusterClient);
181+
182+
assertThat(committed.get())
183+
.extracting(Committer.CommitRequest::getCommittable)
184+
.containsExactlyInAnyOrderElementsOf(
185+
duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE));
186+
}
187+
188+
private static List<Record<Integer>> duplicate(List<Record<Integer>> values) {
189+
return IntStream.range(0, 2)
190+
.boxed()
191+
.flatMap(i -> values.stream())
192+
.collect(Collectors.toList());
193+
}
194+
128195
@Test
129196
public void writerAndCommitterExecuteInBatchMode() throws Exception {
130197
final StreamExecutionEnvironment env = buildBatchEnv();
@@ -184,6 +251,66 @@ private StreamExecutionEnvironment buildStreamEnv() {
184251
return env;
185252
}
186253

254+
private Configuration createConfigForScalingTest(File checkpointDir, int parallelism) {
255+
final Configuration config = new Configuration();
256+
config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
257+
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
258+
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
259+
config.set(
260+
CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
261+
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
262+
config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2000);
263+
config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
264+
265+
return config;
266+
}
267+
268+
private StreamExecutionEnvironment buildStreamEnvWithCheckpointDir(Configuration config) {
269+
final StreamExecutionEnvironment env =
270+
StreamExecutionEnvironment.getExecutionEnvironment(config);
271+
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
272+
env.enableCheckpointing(100);
273+
274+
return env;
275+
}
276+
277+
private JobID runStreamingWithScalingTest(
278+
Configuration config,
279+
int parallelism,
280+
TrackingCommitter trackingCommitter,
281+
boolean shouldMapperFail,
282+
MiniCluster miniCluster,
283+
ClusterClient<?> clusterClient)
284+
throws Exception {
285+
final StreamExecutionEnvironment env = buildStreamEnvWithCheckpointDir(config);
286+
final Source<Integer, ?, ?> source = createStreamingSource();
287+
288+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
289+
.rebalance()
290+
.map(
291+
new FailingCheckpointMapper(
292+
SHARED_OBJECTS.add(new AtomicBoolean(!shouldMapperFail))))
293+
.sinkTo(
294+
TestSinkV2.<Integer>newBuilder()
295+
.setCommitter(trackingCommitter, RecordSerializer::new)
296+
.setWithPostCommitTopology(true)
297+
.build());
298+
299+
final JobID jobId = clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
300+
clusterClient.requestJobResult(jobId).get();
301+
302+
return jobId;
303+
}
304+
305+
private String getCheckpointPath(MiniCluster miniCluster, JobID secondJobId)
306+
throws InterruptedException, ExecutionException, FlinkJobNotFoundException {
307+
final Optional<String> completedCheckpoint =
308+
CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, miniCluster);
309+
310+
assertThat(completedCheckpoint).isPresent();
311+
return completedCheckpoint.get();
312+
}
313+
187314
private StreamExecutionEnvironment buildBatchEnv() {
188315
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
189316
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -245,4 +372,32 @@ public void commit(Collection<CommitRequest<Record<Integer>>> committables) {
245372
@Override
246373
public void close() {}
247374
}
375+
376+
private static class FailingCheckpointMapper
377+
implements MapFunction<Integer, Integer>, CheckpointListener {
378+
379+
private final SharedReference<AtomicBoolean> failed;
380+
private long lastCheckpointId = 0;
381+
private int emittedBetweenCheckpoint = 0;
382+
383+
FailingCheckpointMapper(SharedReference<AtomicBoolean> failed) {
384+
this.failed = failed;
385+
}
386+
387+
@Override
388+
public Integer map(Integer value) {
389+
if (lastCheckpointId >= 1 && emittedBetweenCheckpoint > 0 && !failed.get().get()) {
390+
failed.get().set(true);
391+
throw new RuntimeException("Planned exception.");
392+
}
393+
emittedBetweenCheckpoint++;
394+
return value;
395+
}
396+
397+
@Override
398+
public void notifyCheckpointComplete(long checkpointId) {
399+
lastCheckpointId = checkpointId;
400+
emittedBetweenCheckpoint = 0;
401+
}
402+
}
248403
}

0 commit comments

Comments
 (0)