Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket;
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange;
import 'package:logging/logging.dart' show Logger;
import 'package:meta/meta.dart';
import 'package:pana/models.dart' show Summary;
import 'package:pool/pool.dart' show Pool;
import 'package:pub_dev/package/api_export/api_exporter.dart';
Expand Down Expand Up @@ -128,17 +129,17 @@ class TaskBackend {
final scanLoop = _createLoop(
name: 'scan-packages',
aborted: aborted,
fn: _runOneScanPackagesUpdate,
fn: runOneScanPackagesUpdate,
);
final deleteLoop = _createLoop(
name: 'delete-instances',
aborted: aborted,
fn: _runOneInstanceDeletion,
fn: runOneInstanceDeletion,
);
final createLoop = _createLoop(
name: 'create-instances',
aborted: aborted,
fn: _runOneInstanceCreation,
fn: runOneInstanceCreation,
);

scheduleMicrotask(() async {
Expand Down Expand Up @@ -296,9 +297,8 @@ class TaskBackend {
}
}

Future<Duration> _runOneScanPackagesUpdate(
bool Function() isAbortedFn,
) async {
@visibleForTesting
Future<Duration> runOneScanPackagesUpdate(bool Function() isAbortedFn) async {
final next = await runOneScanPackagesUpdatedCycle(
_scanPackagesUpdatedState,
_db.packages.listUpdatedSince(_scanPackagesUpdatedState.since),
Expand All @@ -317,7 +317,8 @@ class TaskBackend {
return Duration(minutes: 10); // TODO: consider if we scan more frequently.
}

Future<Duration> _runOneInstanceDeletion(bool Function() isAbortedFn) async {
@visibleForTesting
Future<Duration> runOneInstanceDeletion(bool Function() isAbortedFn) async {
_deleteInstancesState = await runOneDeleteInstancesCycle(
_deleteInstancesState,
taskWorkerCloudCompute,
Expand All @@ -327,7 +328,8 @@ class TaskBackend {
return Duration(minutes: 10); // TODO: consider if this should be dynamic
}

Future<Duration> _runOneInstanceCreation(bool Function() isAbortedFn) async {
@visibleForTesting
Future<Duration> runOneInstanceCreation(bool Function() isAbortedFn) async {
final result = await runOneCreateInstancesCycle(
taskWorkerCloudCompute,
_db,
Expand All @@ -337,6 +339,14 @@ class TaskBackend {
return result.$2;
}

@visibleForTesting
Future<void> runOneLoopCycle() async {
bool isAbortedFn() => false;
await runOneScanPackagesUpdate(isAbortedFn);
await runOneInstanceDeletion(isAbortedFn);
await runOneInstanceCreation(isAbortedFn);
}

Future<void> trackPackage(
String packageName, {
bool updateDependents = false,
Expand Down
25 changes: 25 additions & 0 deletions app/lib/task/clock_control.dart
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,31 @@ final class ClockController {
await Future.delayed(Duration(microseconds: 0));
}
}

void incrOffset({int hours = 0, int minutes = 0, int seconds = 0}) {
_offset += Duration(hours: hours, minutes: minutes, seconds: seconds);
}

Future<void> incrUntil(
FutureOr<bool> Function() condition, {
Duration? timeout,
Duration? minimumStep,
}) async {
final deadline = timeout != null ? clock.fromNowBy(timeout) : null;

bool shouldLoop() => deadline == null || clock.now().isBefore(deadline);

while (shouldLoop()) {
if (await condition()) {
return;
}
_offset += minimumStep ?? Duration(minutes: 1);
}
throw TimeoutException(
'Condition given to ClockController.incrUntil was not satisfied'
' before timeout: $timeout',
);
}
}

final class _TravelingTimer {
Expand Down
Loading