Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 33 additions & 58 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,70 +121,54 @@ class TaskBackend {
final aborted = _aborted = Completer();
final stopped = _stopped = Completer();

// Start scanning for packages to be tracked
final _doneScanning = Completer<void>();
final scanLoop = _createLoop(
name: 'scan-packages',
aborted: aborted,
fn: (claim, aborted) async {
await _scanForPackageUpdates(claim, abort: aborted);
},
);
final scheduleLoop = _createLoop(
name: 'schedule',
aborted: aborted,
fn: (claim, aborted) async {
await schedule(claim, taskWorkerCloudCompute, _db, abort: aborted);
},
);

scheduleMicrotask(() async {
try {
// Create a lock for task scheduling, so tasks
final lock = GlobalLock.create(
'$runtimeVersion/task/scanning',
expiration: Duration(minutes: 25),
);
// Wait for background process to finish
await Future.wait([scanLoop, scheduleLoop]);

while (!aborted.isCompleted) {
// Acquire the global lock and scan for package changes while lock is
// valid.
try {
await lock.withClaim((claim) async {
await _scanForPackageUpdates(claim, abort: aborted);
}, abort: aborted);
} catch (e, st) {
// Log this as very bad, and then move on. Nothing good can come
// from straight up stopping.
_log.shout(
'scanning failed (will retry when lock becomes free)',
e,
st,
);
// Sleep 5 minutes to reduce risk of degenerate behavior
await clock.delayed(Duration(minutes: 5));
}
}
} catch (e, st) {
_log.severe('scanning loop crashed', e, st);
} finally {
_log.info('scanning loop stopped');
_doneScanning.complete();
}
// Report background processes as stopped
stopped.complete();
});
}

// Start background task to schedule tasks
final _doneScheduling = Completer<void>();
scheduleMicrotask(() async {
Future<void> _createLoop({
required String name,
required Completer aborted,
required Future<void> Function(GlobalLockClaim claim, Completer aborted) fn,
}) {
return Future.microtask(() async {
try {
// Create a lock for task scheduling, so tasks
// A lock for this task loop makes sure we only have one
// process at the time that tries to update the state.
final lock = GlobalLock.create(
'$runtimeVersion/task/scheduler',
'$runtimeVersion/task/$name-loop',
expiration: Duration(minutes: 25),
);

while (!aborted.isCompleted) {
// Acquire the global lock and create VMs for pending packages, and
// kill overdue VMs.
try {
await lock.withClaim((claim) async {
await schedule(
claim,
taskWorkerCloudCompute,
_db,
abort: aborted,
);
await fn(claim, aborted);
}, abort: aborted);
} catch (e, st) {
// Log this as very bad, and then move on. Nothing good can come
// from straight up stopping.
_log.shout(
'scheduling iteration failed (will retry when lock becomes free)',
'task loop $name failed (will retry when lock becomes free)',
e,
st,
);
Expand All @@ -193,20 +177,11 @@ class TaskBackend {
}
}
} catch (e, st) {
_log.severe('scheduling loop crashed', e, st);
_log.severe('task loop $name crashed', e, st);
} finally {
_log.info('scheduling loop stopped');
_doneScheduling.complete();
_log.info('task loop $name stopped');
}
});

scheduleMicrotask(() async {
// Wait for background process to finish
await Future.wait([_doneScanning.future, _doneScheduling.future]);

// Report background processes as stopped
stopped.complete();
});
}

/// Stop any background process that may be running.
Expand Down