Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 40 additions & 68 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class TaskBackend {
ScanPackagesUpdatedState _scanPackagesUpdatedState =
ScanPackagesUpdatedState.init();
DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init();
CreateInstancesState _createInstanesState = CreateInstancesState.init();

TaskBackend(this._db, this._bucket);

Expand All @@ -127,28 +128,22 @@ class TaskBackend {
final scanLoop = _createLoop(
name: 'scan-packages',
aborted: aborted,
fn: (claim, aborted) async {
await _scanForPackageUpdates(claim, abort: aborted);
},
fn: _runOneScanPackagesUpdate,
);
final deleteLoop = _createLoop(
name: 'delete-instances',
aborted: aborted,
fn: (claim, aborted) async {
await _scanForInstanceDeletion(claim, abort: aborted);
},
fn: _runOneInstanceDeletion,
);
final scheduleLoop = _createLoop(
name: 'schedule',
final createLoop = _createLoop(
name: 'create-instances',
aborted: aborted,
fn: (claim, aborted) async {
await schedule(claim, taskWorkerCloudCompute, _db, abort: aborted);
},
fn: _runOneInstanceCreation,
);

scheduleMicrotask(() async {
// Wait for background process to finish
await Future.wait([scanLoop, deleteLoop, scheduleLoop]);
await Future.wait([scanLoop, deleteLoop, createLoop]);

// Report background processes as stopped
stopped.complete();
Expand All @@ -158,7 +153,7 @@ class TaskBackend {
Future<void> _createLoop({
required String name,
required Completer aborted,
required Future<void> Function(GlobalLockClaim claim, Completer aborted) fn,
required Future<Duration> Function(bool Function() isAbortedFn) fn,
}) {
return Future.microtask(() async {
try {
Expand All @@ -172,7 +167,19 @@ class TaskBackend {
while (!aborted.isCompleted) {
try {
await lock.withClaim((claim) async {
await fn(claim, aborted);
bool isAbortedFn() => !claim.valid || aborted.isCompleted;
while (!isAbortedFn()) {
final delay = await fn(isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or [delay] before doing it again!
await aborted.future.timeoutWithClock(
delay,
onTimeout: () => null,
);
}
}, abort: aborted);
} catch (e, st) {
// Log this as very bad, and then move on. Nothing good can come
Expand Down Expand Up @@ -289,33 +296,10 @@ class TaskBackend {
}
}

/// Scan for updates from packages until [abort] is resolved, or [claim]
/// is lost.
Future<void> _scanForPackageUpdates(
GlobalLockClaim claim, {
Completer<void>? abort,
}) async {
abort ??= Completer<void>();

bool isAbortedFn() => !claim.valid || abort!.isCompleted;
while (!isAbortedFn()) {
await _runOneScanPackagesUpdate(isAbortedFn: isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or 10 minutes before scanning again!
await abort.future.timeoutWithClock(
Duration(minutes: 10),
onTimeout: () => null,
);
}
}

Future<void> _runOneScanPackagesUpdate({
required bool Function() isAbortedFn,
}) async {
final next = await calculateScanPackagesUpdatedLoop(
Future<Duration> _runOneScanPackagesUpdate(
bool Function() isAbortedFn,
) async {
final next = await runOneScanPackagesUpdatedCycle(
_scanPackagesUpdatedState,
_db.packages.listUpdatedSince(_scanPackagesUpdatedState.since),
isAbortedFn,
Expand All @@ -324,45 +308,33 @@ class TaskBackend {

for (final p in next.packages) {
if (isAbortedFn()) {
return;
return Duration(minutes: 10);
}
// Check the package
await trackPackage(p, updateDependents: true);
}
}

/// Scan for compute instances that can be deleted until [abort] is resolved,
/// or [claim] is lost.
Future<void> _scanForInstanceDeletion(
GlobalLockClaim claim, {
Completer<void>? abort,
}) async {
abort ??= Completer<void>();

bool isAbortedFn() => !claim.valid || abort!.isCompleted;
while (!isAbortedFn()) {
await _runOneInstanceDeletion(isAbortedFn: isAbortedFn);

if (isAbortedFn()) {
return;
}
// Wait until aborted or 10 minutes before scanning again!
await abort.future.timeoutWithClock(
Duration(minutes: 10),
onTimeout: () => null,
);
}
return Duration(minutes: 10); // TODO: consider if we scan more frequently.
}

Future<void> _runOneInstanceDeletion({
required bool Function() isAbortedFn,
}) async {
_deleteInstancesState = await scanAndDeleteInstances(
Future<Duration> _runOneInstanceDeletion(bool Function() isAbortedFn) async {
_deleteInstancesState = await runOneDeleteInstancesCycle(
_deleteInstancesState,
taskWorkerCloudCompute,
isAbortedFn,
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
);
return Duration(minutes: 10); // TODO: consider if this should be dynamic
}

Future<Duration> _runOneInstanceCreation(bool Function() isAbortedFn) async {
final result = await runOneCreateInstancesCycle(
taskWorkerCloudCompute,
_db,
state: _createInstanesState,
);
_createInstanesState = result.$1;
return result.$2;
}

Future<void> trackPackage(
Expand Down
2 changes: 1 addition & 1 deletion app/lib/task/loops/delete_instances.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final class DeleteInstancesState {

/// Calculates the next state of delete instances loop by processing
/// the input from [cloudCompute].
Future<DeleteInstancesState> scanAndDeleteInstances(
Future<DeleteInstancesState> runOneDeleteInstancesCycle(
DeleteInstancesState state,
CloudCompute cloudCompute,
bool Function() isAbortedFn, {
Expand Down
2 changes: 1 addition & 1 deletion app/lib/task/loops/scan_packages_updated.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ScanPackagesUpdatedNextState {

/// Calculates the next state of scan packages updated loop by
/// processing the input [stream].
Future<ScanPackagesUpdatedNextState> calculateScanPackagesUpdatedLoop(
Future<ScanPackagesUpdatedNextState> runOneScanPackagesUpdatedCycle(
ScanPackagesUpdatedState state,
Stream<({String name, DateTime updated})> stream,
bool Function() isAbortedFn,
Expand Down
Loading