Skip to content

Commit 4735dbf

Browse files
brybackiwendigo
authored andcommitted
Fix graceful shutdown
Graceful shutdown logic compared previous nodestate instead of new SHUTTING_DOWN nodestate and in effect it never finished the waitActiveTasksToFinish. This behavior has been recreated and is covered by the new test. Node State is now correctly assigned before starting the shutdown sequence.
1 parent f39a98d commit 4735dbf

File tree

2 files changed

+46
-13
lines changed

2 files changed

+46
-13
lines changed

core/trino-main/src/main/java/io/trino/server/NodeStateManager.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -216,15 +216,22 @@ public synchronized void transitionState(NodeState state)
216216
}
217217
}
218218
case SHUTTING_DOWN -> {
219-
if (currState.state() == DRAINED && nodeState.compareAndSet(currState, currState.toShuttingDown())) {
219+
if (isCoordinator) {
220+
throw new UnsupportedOperationException("Cannot shutdown coordinator");
221+
}
222+
VersionedState shuttingDown = currState.toShuttingDown();
223+
if (currState.state() == DRAINED && nodeState.compareAndSet(currState, shuttingDown)) {
220224
requestTerminate();
221225
return;
222226
}
227+
nodeState.set(shuttingDown);
223228
requestGracefulShutdown();
224-
nodeState.set(currState.toShuttingDown());
225229
return;
226230
}
227231
case DRAINING -> {
232+
if (isCoordinator) {
233+
throw new UnsupportedOperationException("Cannot drain coordinator");
234+
}
228235
if (currState.state() == ACTIVE && nodeState.compareAndSet(currState, currState.toDraining())) {
229236
requestDrain();
230237
return;
@@ -246,9 +253,6 @@ private long nextStateVersion()
246253
private synchronized void requestDrain()
247254
{
248255
log.debug("Drain requested, NodeState: %s", getServerState());
249-
if (isCoordinator) {
250-
throw new UnsupportedOperationException("Cannot drain coordinator");
251-
}
252256

253257
// wait for a grace period (so that draining state is observed by the coordinator) before starting draining
254258
// when coordinator observes draining no new tasks are assigned to this worker
@@ -259,22 +263,16 @@ private synchronized void requestDrain()
259263
private void requestTerminate()
260264
{
261265
log.info("Immediate Shutdown requested");
262-
if (isCoordinator) {
263-
throw new UnsupportedOperationException("Cannot shutdown coordinator");
264-
}
265266

266267
shutdownHandler.schedule(this::terminate, 0, MILLISECONDS);
267268
}
268269

269270
private void requestGracefulShutdown()
270271
{
271272
log.info("Shutdown requested");
272-
if (isCoordinator) {
273-
throw new UnsupportedOperationException("Cannot shutdown coordinator");
274-
}
275273

276-
// wait for a grace period (so that shutting down state is observed by the coordinator) to start the shutdown sequence
277274
VersionedState expectedState = nodeState.get();
275+
// wait for a grace period (so that shutting down state is observed by the coordinator) to start the shutdown sequence
278276
shutdownHandler.schedule(() -> shutdown(expectedState), gracePeriod.toMillis(), MILLISECONDS);
279277
}
280278

core/trino-main/src/test/java/io/trino/server/TestNodeStateManager.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static io.trino.metadata.NodeState.ACTIVE;
5151
import static io.trino.metadata.NodeState.DRAINED;
5252
import static io.trino.metadata.NodeState.DRAINING;
53+
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
5354
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5455
import static java.util.concurrent.TimeUnit.SECONDS;
5556
import static org.assertj.core.api.Assertions.assertThat;
@@ -148,7 +149,41 @@ void testImmediateTransitionToShuttingDownWhenDrained()
148149
}
149150

150151
@Test
151-
void testWaitActiveTasksToFinish()
152+
void testWaitActiveTasksToFinishDuringShutdown()
153+
throws URISyntaxException
154+
{
155+
List<TaskInfo> taskInfos = new ArrayList<>();
156+
TaskInfo task = TaskInfo.createInitialTask(
157+
new TaskId(new StageId("query1", 1), 1, 1),
158+
new URI(""),
159+
"1",
160+
false,
161+
Optional.empty(),
162+
new TaskStats(DateTime.now(), null));
163+
taskInfos.add(task);
164+
tasks.set(taskInfos);
165+
166+
// Draining - will wait for tasks to finish
167+
nodeStateManager.transitionState(SHUTTING_DOWN);
168+
assertThat(nodeStateManager.getServerState()).isEqualTo(SHUTTING_DOWN);
169+
170+
// make sure that nodeStateManager registered a listener for tasks to finish
171+
ticker.increment(1, SECONDS);
172+
executor.run();
173+
await().atMost(1, SECONDS).until(() -> sqlTasksObservable.getTasks().size() == 1);
174+
175+
// simulate task completion after some time
176+
tasks.set(Collections.emptyList());
177+
sqlTasksObservable.getTasks().get(task.taskStatus().getTaskId())
178+
.stateChanged(TaskState.FINISHED);
179+
180+
// when NodeStateManager sees task finished - it will drain after another drain period
181+
await().atMost(1, SECONDS)
182+
.untilAsserted(() -> assertThat(nodeStateManager.getServerState()).isEqualTo(SHUTTING_DOWN));
183+
}
184+
185+
@Test
186+
void testWaitActiveTasksToFinishDuringDraining()
152187
throws URISyntaxException
153188
{
154189
List<TaskInfo> taskInfos = new ArrayList<>();

0 commit comments

Comments
 (0)