Skip to content

Commit 43e3422

Browse files
authored
Refactor + separately test the instance deletion logic in task scheduling (#9097)
1 parent 526988e commit 43e3422

File tree

5 files changed

+297
-51
lines changed

5 files changed

+297
-51
lines changed

app/lib/task/backend.dart

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import 'package:pub_dev/package/backend.dart';
2424
import 'package:pub_dev/package/models.dart';
2525
import 'package:pub_dev/package/upload_signer_service.dart';
2626
import 'package:pub_dev/scorecard/backend.dart';
27+
import 'package:pub_dev/shared/configuration.dart';
2728
import 'package:pub_dev/shared/datastore.dart';
2829
import 'package:pub_dev/shared/exceptions.dart';
2930
import 'package:pub_dev/shared/redis_cache.dart';
@@ -43,6 +44,7 @@ import 'package:pub_dev/task/clock_control.dart';
4344
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
4445
import 'package:pub_dev/task/global_lock.dart';
4546
import 'package:pub_dev/task/handlers.dart';
47+
import 'package:pub_dev/task/loops/delete_instances.dart';
4648
import 'package:pub_dev/task/loops/scan_packages_updated.dart';
4749
import 'package:pub_dev/task/models.dart'
4850
show
@@ -101,6 +103,7 @@ class TaskBackend {
101103

102104
ScanPackagesUpdatedState _scanPackagesUpdatedState =
103105
ScanPackagesUpdatedState.init();
106+
DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init();
104107

105108
TaskBackend(this._db, this._bucket);
106109

@@ -128,6 +131,13 @@ class TaskBackend {
128131
await _scanForPackageUpdates(claim, abort: aborted);
129132
},
130133
);
134+
final deleteLoop = _createLoop(
135+
name: 'delete-instances',
136+
aborted: aborted,
137+
fn: (claim, aborted) async {
138+
await _scanForInstanceDeletion(claim, abort: aborted);
139+
},
140+
);
131141
final scheduleLoop = _createLoop(
132142
name: 'schedule',
133143
aborted: aborted,
@@ -138,7 +148,7 @@ class TaskBackend {
138148

139149
scheduleMicrotask(() async {
140150
// Wait for background process to finish
141-
await Future.wait([scanLoop, scheduleLoop]);
151+
await Future.wait([scanLoop, deleteLoop, scheduleLoop]);
142152

143153
// Report background processes as stopped
144154
stopped.complete();
@@ -321,6 +331,40 @@ class TaskBackend {
321331
}
322332
}
323333

334+
/// Scan for compute instances that can be deleted until [abort] is resolved,
335+
/// or [claim] is lost.
336+
Future<void> _scanForInstanceDeletion(
337+
GlobalLockClaim claim, {
338+
Completer<void>? abort,
339+
}) async {
340+
abort ??= Completer<void>();
341+
342+
bool isAbortedFn() => !claim.valid || abort!.isCompleted;
343+
while (!isAbortedFn()) {
344+
await _runOneInstanceDeletion(isAbortedFn: isAbortedFn);
345+
346+
if (isAbortedFn()) {
347+
return;
348+
}
349+
// Wait until aborted or 10 minutes before scanning again!
350+
await abort.future.timeoutWithClock(
351+
Duration(minutes: 10),
352+
onTimeout: () => null,
353+
);
354+
}
355+
}
356+
357+
Future<void> _runOneInstanceDeletion({
358+
required bool Function() isAbortedFn,
359+
}) async {
360+
_deleteInstancesState = await scanAndDeleteInstances(
361+
_deleteInstancesState,
362+
taskWorkerCloudCompute,
363+
isAbortedFn,
364+
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
365+
);
366+
}
367+
324368
Future<void> trackPackage(
325369
String packageName, {
326370
bool updateDependents = false,

app/lib/task/cloudcompute/fakecloudcompute.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ final class FakeCloudCompute extends CloudCompute {
2626
FakeCloudCompute({InstanceRunner? instanceRunner})
2727
: _instanceRunner = instanceRunner ?? _defaultInstanceRunner;
2828

29+
factory FakeCloudCompute.noop() =>
30+
FakeCloudCompute(instanceRunner: (_) async {});
31+
2932
@override
3033
final List<String> zones = const ['zone-a', 'zone-b'];
3134

@@ -105,6 +108,26 @@ final class FakeCloudCompute extends CloudCompute {
105108
}
106109
}
107110

111+
/// Creates a fake instance with a running (or terminated) state.
112+
/// There is no scheduling or other state update, it only registers the instance.
113+
void fakeCreateRunningInstance({
114+
required String zone,
115+
required String instanceName,
116+
required Duration ago,
117+
bool isTerminated = false,
118+
}) {
119+
final instance = FakeCloudInstance._(
120+
instanceName: instanceName,
121+
zone: zone,
122+
created: clock.now().subtract(ago),
123+
state: isTerminated ? InstanceState.terminated : InstanceState.pending,
124+
dockerImage: 'fake-docker-image',
125+
arguments: [],
126+
description: 'fake-description',
127+
);
128+
_instances.add(instance);
129+
}
130+
108131
/// Change state of instance with [instanceName] to [InstanceState.running].
109132
///
110133
/// This does not start any subprocess or run any code. Just changes the state
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:basics/basics.dart';
8+
import 'package:clock/clock.dart';
9+
import 'package:logging/logging.dart';
10+
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
11+
12+
final _log = Logger('pub.task.scan_instances');
13+
14+
/// The internal state for scanning and deleting instances.
15+
final class DeleteInstancesState {
16+
// Maps the `CloudInstance.instanceName` to the deletion
17+
// start timestamp.
18+
final Map<String, DateTime> deletions;
19+
20+
DeleteInstancesState({required this.deletions});
21+
22+
factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {});
23+
}
24+
25+
/// Calculates the next state of delete instances loop by processing
26+
/// the input from [cloudCompute].
27+
Future<DeleteInstancesState> scanAndDeleteInstances(
28+
DeleteInstancesState state,
29+
CloudCompute cloudCompute,
30+
bool Function() isAbortedFn, {
31+
required int maxTaskRunHours,
32+
}) async {
33+
// Purge deletionsInProgress after 5 minutes, if instance has not disappeared
34+
// after 5 minutes we'll try to delete it again.
35+
final keepTreshold = clock.ago(minutes: 5);
36+
final deletionInProgress = {
37+
...state.deletions.whereValue((v) => v.isAfter(keepTreshold)),
38+
};
39+
40+
final futures = <Future>[];
41+
await for (final instance in cloudCompute.listInstances()) {
42+
if (isAbortedFn()) {
43+
break;
44+
}
45+
46+
// Prevent multiple calls to delete the same instance.
47+
if (deletionInProgress.containsKey(instance.instanceName)) {
48+
continue;
49+
}
50+
51+
// If terminated or older than maxInstanceAge, delete the instance...
52+
final isTerminated = instance.state == InstanceState.terminated;
53+
final isTooOld = instance.created
54+
.add(Duration(hours: maxTaskRunHours))
55+
.isBefore(clock.now());
56+
57+
if (isTooOld) {
58+
// This indicates that something is wrong the with the instance,
59+
// ideally it should have detected its own deadline being violated
60+
// and terminated on its own. Of course, this can fail for arbitrary
61+
// reasons in a distributed system.
62+
_log.warning('terminating $instance for being too old!');
63+
} else if (isTerminated) {
64+
_log.info('deleting $instance as it has terminated.');
65+
} else {
66+
// Do not delete this instance
67+
continue;
68+
}
69+
70+
deletionInProgress[instance.instanceName] = clock.now();
71+
72+
futures.add(
73+
Future.microtask(() async {
74+
try {
75+
await cloudCompute.delete(instance.zone, instance.instanceName);
76+
} catch (e, st) {
77+
_log.severe('Failed to delete $instance', e, st);
78+
}
79+
}),
80+
);
81+
}
82+
83+
await Future.wait(futures);
84+
return DeleteInstancesState(deletions: deletionInProgress);
85+
}

app/lib/task/scheduler.dart

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -60,64 +60,25 @@ Future<void> schedule(
6060
}
6161
}
6262

63-
// Set of `CloudInstance.instanceName`s currently being deleted.
64-
// This to avoid deleting instances where the deletion process is still
65-
// running.
66-
final deletionInProgress = <String>{};
67-
6863
// Create a fast RNG with random seed for picking zones.
6964
final rng = Random(Random.secure().nextInt(2 << 31));
7065

66+
bool isAbortedFn() => !claim.valid || abort.isCompleted;
67+
7168
// Run scheduling iterations, so long as we have a valid claim
72-
while (claim.valid && !abort.isCompleted) {
69+
while (!isAbortedFn()) {
7370
final iterationStart = clock.now();
7471
_log.info('Starting scheduling cycle');
7572

76-
// Count number of instances, and delete old instances
77-
var instances = 0;
78-
await for (final instance in compute.listInstances()) {
79-
instances += 1; // count the instance
80-
81-
// If terminated or older than maxInstanceAge, delete the instance...
82-
final isTerminated = instance.state == InstanceState.terminated;
83-
final isTooOld = instance.created
84-
.add(Duration(hours: activeConfiguration.maxTaskRunHours))
85-
.isBefore(clock.now());
86-
// Also check deletionInProgress to prevent multiple calls to delete the
87-
// same instance
88-
final isBeingDeleted = deletionInProgress.contains(instance.instanceName);
89-
if ((isTerminated || isTooOld) && !isBeingDeleted) {
90-
if (isTooOld) {
91-
// This indicates that something is wrong the with the instance,
92-
// ideally it should have detected its own deadline being violated
93-
// and terminated on its own. Of course, this can fail for arbitrary
94-
// reasons in a distributed system.
95-
_log.warning('terminating $instance for being too old!');
96-
} else if (isTerminated) {
97-
_log.info('deleting $instance as it has terminated.');
98-
}
99-
100-
deletionInProgress.add(instance.instanceName);
101-
scheduleMicrotask(() async {
102-
final deletionStart = clock.now();
103-
try {
104-
await compute.delete(instance.zone, instance.instanceName);
105-
} catch (e, st) {
106-
_log.severe('Failed to delete $instance', e, st);
107-
} finally {
108-
// Wait at-least 5 minutes from start of deletion until we remove
109-
// it from [deletionInProgress] that way we give the API some time
110-
// reconcile state.
111-
await sleepOrAborted(Duration(minutes: 5), since: deletionStart);
112-
deletionInProgress.remove(instance.instanceName);
113-
}
114-
});
115-
}
73+
final instances = await compute.listInstances().toList();
74+
if (isAbortedFn()) {
75+
break;
11676
}
77+
11778
_log.info('Found $instances instances');
11879

11980
// If we are not allowed to create new instances within the allowed quota,
120-
if (activeConfiguration.maxTaskInstances <= instances) {
81+
if (activeConfiguration.maxTaskInstances <= instances.length) {
12182
_log.info('Reached instance limit, trying again in 30s');
12283
// Wait 30 seconds then list instances again, so that we can count them
12384
await sleepOrAborted(Duration(seconds: 30), since: iterationStart);
@@ -145,7 +106,7 @@ Future<void> schedule(
145106
var pendingPackagesReviewed = 0;
146107
final selectLimit = min(
147108
_maxInstancesPerIteration,
148-
max(0, activeConfiguration.maxTaskInstances - instances),
109+
max(0, activeConfiguration.maxTaskInstances - instances.length),
149110
);
150111

151112
Future<void> scheduleInstance(({String package}) selected) async {
@@ -249,14 +210,14 @@ Future<void> schedule(
249210

250211
// If there was no pending packages reviewed, and no instances currently
251212
// running, then we can easily sleep 5 minutes before we poll again.
252-
if (instances == 0 && pendingPackagesReviewed == 0) {
213+
if (instances.isEmpty && pendingPackagesReviewed == 0) {
253214
await sleepOrAborted(Duration(minutes: 5));
254215
continue;
255216
}
256217

257218
// If more tasks is available and quota wasn't used up, we only sleep 10s
258219
if (pendingPackagesReviewed >= _maxInstancesPerIteration &&
259-
activeConfiguration.maxTaskInstances > instances) {
220+
activeConfiguration.maxTaskInstances > instances.length) {
260221
await sleepOrAborted(Duration(seconds: 10), since: iterationStart);
261222
continue;
262223
}

0 commit comments

Comments
 (0)