Skip to content

Commit 34dc70b

Browse files
authored
Refactoring cloud compute instance creation with external state of banned zones. (#9115)
1 parent 527752a commit 34dc70b

File tree

6 files changed

+213
-248
lines changed

6 files changed

+213
-248
lines changed

app/lib/task/backend.dart

Lines changed: 40 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class TaskBackend {
104104
ScanPackagesUpdatedState _scanPackagesUpdatedState =
105105
ScanPackagesUpdatedState.init();
106106
DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init();
107+
CreateInstancesState _createInstanesState = CreateInstancesState.init();
107108

108109
TaskBackend(this._db, this._bucket);
109110

@@ -127,28 +128,22 @@ class TaskBackend {
127128
final scanLoop = _createLoop(
128129
name: 'scan-packages',
129130
aborted: aborted,
130-
fn: (claim, aborted) async {
131-
await _scanForPackageUpdates(claim, abort: aborted);
132-
},
131+
fn: _runOneScanPackagesUpdate,
133132
);
134133
final deleteLoop = _createLoop(
135134
name: 'delete-instances',
136135
aborted: aborted,
137-
fn: (claim, aborted) async {
138-
await _scanForInstanceDeletion(claim, abort: aborted);
139-
},
136+
fn: _runOneInstanceDeletion,
140137
);
141-
final scheduleLoop = _createLoop(
142-
name: 'schedule',
138+
final createLoop = _createLoop(
139+
name: 'create-instances',
143140
aborted: aborted,
144-
fn: (claim, aborted) async {
145-
await schedule(claim, taskWorkerCloudCompute, _db, abort: aborted);
146-
},
141+
fn: _runOneInstanceCreation,
147142
);
148143

149144
scheduleMicrotask(() async {
150145
// Wait for background process to finish
151-
await Future.wait([scanLoop, deleteLoop, scheduleLoop]);
146+
await Future.wait([scanLoop, deleteLoop, createLoop]);
152147

153148
// Report background processes as stopped
154149
stopped.complete();
@@ -158,7 +153,7 @@ class TaskBackend {
158153
Future<void> _createLoop({
159154
required String name,
160155
required Completer aborted,
161-
required Future<void> Function(GlobalLockClaim claim, Completer aborted) fn,
156+
required Future<Duration> Function(bool Function() isAbortedFn) fn,
162157
}) {
163158
return Future.microtask(() async {
164159
try {
@@ -172,7 +167,19 @@ class TaskBackend {
172167
while (!aborted.isCompleted) {
173168
try {
174169
await lock.withClaim((claim) async {
175-
await fn(claim, aborted);
170+
bool isAbortedFn() => !claim.valid || aborted.isCompleted;
171+
while (!isAbortedFn()) {
172+
final delay = await fn(isAbortedFn);
173+
174+
if (isAbortedFn()) {
175+
return;
176+
}
177+
// Wait until aborted or [delay] before doing it again!
178+
await aborted.future.timeoutWithClock(
179+
delay,
180+
onTimeout: () => null,
181+
);
182+
}
176183
}, abort: aborted);
177184
} catch (e, st) {
178185
// Log this as very bad, and then move on. Nothing good can come
@@ -289,33 +296,10 @@ class TaskBackend {
289296
}
290297
}
291298

292-
/// Scan for updates from packages until [abort] is resolved, or [claim]
293-
/// is lost.
294-
Future<void> _scanForPackageUpdates(
295-
GlobalLockClaim claim, {
296-
Completer<void>? abort,
297-
}) async {
298-
abort ??= Completer<void>();
299-
300-
bool isAbortedFn() => !claim.valid || abort!.isCompleted;
301-
while (!isAbortedFn()) {
302-
await _runOneScanPackagesUpdate(isAbortedFn: isAbortedFn);
303-
304-
if (isAbortedFn()) {
305-
return;
306-
}
307-
// Wait until aborted or 10 minutes before scanning again!
308-
await abort.future.timeoutWithClock(
309-
Duration(minutes: 10),
310-
onTimeout: () => null,
311-
);
312-
}
313-
}
314-
315-
Future<void> _runOneScanPackagesUpdate({
316-
required bool Function() isAbortedFn,
317-
}) async {
318-
final next = await calculateScanPackagesUpdatedLoop(
299+
Future<Duration> _runOneScanPackagesUpdate(
300+
bool Function() isAbortedFn,
301+
) async {
302+
final next = await runOneScanPackagesUpdatedCycle(
319303
_scanPackagesUpdatedState,
320304
_db.packages.listUpdatedSince(_scanPackagesUpdatedState.since),
321305
isAbortedFn,
@@ -324,45 +308,33 @@ class TaskBackend {
324308

325309
for (final p in next.packages) {
326310
if (isAbortedFn()) {
327-
return;
311+
return Duration(minutes: 10);
328312
}
329313
// Check the package
330314
await trackPackage(p, updateDependents: true);
331315
}
332-
}
333-
334-
/// Scan for compute instances that can be deleted until [abort] is resolved,
335-
/// or [claim] is lost.
336-
Future<void> _scanForInstanceDeletion(
337-
GlobalLockClaim claim, {
338-
Completer<void>? abort,
339-
}) async {
340-
abort ??= Completer<void>();
341316

342-
bool isAbortedFn() => !claim.valid || abort!.isCompleted;
343-
while (!isAbortedFn()) {
344-
await _runOneInstanceDeletion(isAbortedFn: isAbortedFn);
345-
346-
if (isAbortedFn()) {
347-
return;
348-
}
349-
// Wait until aborted or 10 minutes before scanning again!
350-
await abort.future.timeoutWithClock(
351-
Duration(minutes: 10),
352-
onTimeout: () => null,
353-
);
354-
}
317+
return Duration(minutes: 10); // TODO: consider if we scan more frequently.
355318
}
356319

357-
Future<void> _runOneInstanceDeletion({
358-
required bool Function() isAbortedFn,
359-
}) async {
360-
_deleteInstancesState = await scanAndDeleteInstances(
320+
Future<Duration> _runOneInstanceDeletion(bool Function() isAbortedFn) async {
321+
_deleteInstancesState = await runOneDeleteInstancesCycle(
361322
_deleteInstancesState,
362323
taskWorkerCloudCompute,
363324
isAbortedFn,
364325
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
365326
);
327+
return Duration(minutes: 10); // TODO: consider if this should be dynamic
328+
}
329+
330+
Future<Duration> _runOneInstanceCreation(bool Function() isAbortedFn) async {
331+
final result = await runOneCreateInstancesCycle(
332+
taskWorkerCloudCompute,
333+
_db,
334+
state: _createInstanesState,
335+
);
336+
_createInstanesState = result.$1;
337+
return result.$2;
366338
}
367339

368340
Future<void> trackPackage(

app/lib/task/loops/delete_instances.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ final class DeleteInstancesState {
2424

2525
/// Calculates the next state of delete instances loop by processing
2626
/// the input from [cloudCompute].
27-
Future<DeleteInstancesState> scanAndDeleteInstances(
27+
Future<DeleteInstancesState> runOneDeleteInstancesCycle(
2828
DeleteInstancesState state,
2929
CloudCompute cloudCompute,
3030
bool Function() isAbortedFn, {

app/lib/task/loops/scan_packages_updated.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ScanPackagesUpdatedNextState {
4848

4949
/// Calculates the next state of scan packages updated loop by
5050
/// processing the input [stream].
51-
Future<ScanPackagesUpdatedNextState> calculateScanPackagesUpdatedLoop(
51+
Future<ScanPackagesUpdatedNextState> runOneScanPackagesUpdatedCycle(
5252
ScanPackagesUpdatedState state,
5353
Stream<({String name, DateTime updated})> stream,
5454
bool Function() isAbortedFn,

0 commit comments

Comments
 (0)