From 548483a34141510d5f72eed6708e0e4a4317404c Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 3 Dec 2025 14:10:09 +0100 Subject: [PATCH 1/4] Refactor: track compute zones bans and round-robin cursor in a separate component. --- app/lib/task/cloudcompute/zone_tracker.dart | 112 ++++++++++++++++++ app/lib/task/scheduler.dart | 100 +++------------- .../task/cloudcompute/zone_tracker_test.dart | 98 +++++++++++++++ 3 files changed, 229 insertions(+), 81 deletions(-) create mode 100644 app/lib/task/cloudcompute/zone_tracker.dart create mode 100644 app/test/task/cloudcompute/zone_tracker_test.dart diff --git a/app/lib/task/cloudcompute/zone_tracker.dart b/app/lib/task/cloudcompute/zone_tracker.dart new file mode 100644 index 0000000000..4f7963ce30 --- /dev/null +++ b/app/lib/task/cloudcompute/zone_tracker.dart @@ -0,0 +1,112 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:clock/clock.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; + +final _log = Logger('pub.task.zone_tracker'); + +/// Tracks compute zones and custom ban periods. +class ComputeZoneTracker { + final List _zones; + final _bannedUntil = {}; + + int _cursor = 0; + + ComputeZoneTracker(this._zones); + + /// Creates or extends zone ban period. + void banZone(String zone, {int minutes = 0}) { + final until = clock.fromNow(minutes: minutes); + final currentBan = _bannedUntil[zone]; + if (currentBan == null || currentBan.isBefore(until)) { + _bannedUntil[zone] = until; + } + } + + void _pruneBans() { + final now = clock.now(); + _bannedUntil.removeWhere((k, v) => v.isBefore(now)); + } + + /// Whether there is any available zone that is not banned. + bool hasAvailableZone() { + _pruneBans(); + return _zones.any((zone) => !_bannedUntil.containsKey(zone)); + } + + /// Tries to pick an available zone. + /// + /// Zone selection follows a round-robin algorithm, skipping the banned zones. + /// + /// Returns `null` if there is no zone available. + String? tryPickZone() { + _pruneBans(); + // cursor may be moved at most the number of zones times + for (var i = 0; i < _zones.length; i++) { + final zone = _zones[_cursor]; + _cursor = (_cursor + 1) % _zones.length; + if (!_bannedUntil.containsKey(zone)) { + return zone; + } + } + return null; + } + + @visibleForTesting + List tryPickZones(int n) { + return List.generate(n, (_) => tryPickZone()); + } + + /// Executes [fn] in compute [zone] and handles zone-related exceptions + /// with the appropriate bans. + Future withZoneAndInstance( + String zone, + String instanceName, + Future Function() fn, + ) async { + try { + await fn(); + } on ZoneExhaustedException catch (e, st) { + // A zone being exhausted is normal operations, we just use another + // zone for 15 minutes. + _log.info( + 'zone resources exhausted, banning ${e.zone} for 30 minutes', + e, + st, + ); + // Ban usage of zone for 30 minutes + banZone(e.zone, minutes: 30); + } on QuotaExhaustedException catch (e, st) { + // Quota exhausted, this can happen, but it shouldn't. We'll just stop + // doing anything for 10 minutes. Hopefully that'll resolve the issue. + // We log severe, because this is a reason to adjust the quota or + // instance limits. + _log.severe( + 'Quota exhausted trying to create $instanceName, banning all zones ' + 'for 10 minutes', + e, + st, + ); + + // Ban all zones for 10 minutes + for (final zone in _zones) { + banZone(zone, minutes: 10); + } + } on Exception catch (e, st) { + // No idea what happened, but for robustness we'll stop using the zone + // and shout into the logs + _log.shout( + 'Failed to create instance $instanceName, banning zone "$zone" for ' + '15 minutes', + e, + st, + ); + // Ban usage of zone for 15 minutes + banZone(zone, minutes: 15); + } + } +} diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index e69e87ad7e..82aadb2b16 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -13,6 +13,7 @@ import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; +import 'package:pub_dev/task/cloudcompute/zone_tracker.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/models.dart'; @@ -43,31 +44,13 @@ Future schedule( } } - // Map from zone to DateTime when zone is allowed again - final zoneBannedUntil = { - for (final zone in compute.zones) zone: DateTime(0), - }; - void banZone(String zone, {int minutes = 0, int hours = 0, int days = 0}) { - if (!zoneBannedUntil.containsKey(zone)) { - throw ArgumentError.value(zone, 'zone'); - } - - final until = clock.now().add( - Duration(minutes: minutes, hours: hours, days: days), - ); - if (zoneBannedUntil[zone]!.isBefore(until)) { - zoneBannedUntil[zone] = until; - } - } + final zoneTracker = ComputeZoneTracker(compute.zones); // Set of `CloudInstance.instanceName`s currently being deleted. // This to avoid deleting instances where the deletion process is still // running. final deletionInProgress = {}; - // Create a fast RNG with random seed for picking zones. - final rng = Random(Random.secure().nextInt(2 << 31)); - // Run scheduling iterations, so long as we have a valid claim while (claim.valid && !abort.isCompleted) { final iterationStart = clock.now(); @@ -124,18 +107,8 @@ Future schedule( continue; // skip the rest of the iteration } - // Determine which zones are not banned - final allowedZones = - zoneBannedUntil.entries - .where((e) => e.value.isBefore(clock.now())) - .map((e) => e.key) - .toList() - ..shuffle(rng); - var nextZoneIndex = 0; - String pickZone() => allowedZones[nextZoneIndex++ % allowedZones.length]; - // If no zones are available, we sleep and try again later. - if (allowedZones.isEmpty) { + if (!zoneTracker.hasAvailableZone()) { _log.info('All compute-engine zones are banned, trying again in 30s'); await sleepOrAborted(Duration(seconds: 30), since: iterationStart); continue; @@ -152,7 +125,10 @@ Future schedule( pendingPackagesReviewed += 1; final instanceName = compute.generateInstanceName(); - final zone = pickZone(); + final zone = zoneTracker.tryPickZone(); + if (zone == null) { + return; + } final updated = await updatePackageStateWithPendingVersions( db, @@ -171,7 +147,7 @@ Future schedule( await Future.microtask(() async { var rollbackPackageState = true; - try { + await zoneTracker.withZoneAndInstance(zone, instanceName, () async { // Purging cache is important for the edge case, where the new upload happens // on a different runtime version, and the current one's cache is still stale // and does not have the version yet. @@ -189,56 +165,18 @@ Future schedule( description: description, ); rollbackPackageState = false; - } on ZoneExhaustedException catch (e, st) { - // A zone being exhausted is normal operations, we just use another - // zone for 15 minutes. - _log.info( - 'zone resources exhausted, banning ${e.zone} for 30 minutes', - e, - st, - ); - // Ban usage of zone for 30 minutes - banZone(e.zone, minutes: 30); - } on QuotaExhaustedException catch (e, st) { - // Quota exhausted, this can happen, but it shouldn't. We'll just stop - // doing anything for 10 minutes. Hopefully that'll resolve the issue. - // We log severe, because this is a reason to adjust the quota or - // instance limits. - _log.severe( - 'Quota exhausted trying to create $instanceName, banning all zones ' - 'for 10 minutes', - e, - st, - ); - - // Ban all zones for 10 minutes - for (final zone in compute.zones) { - banZone(zone, minutes: 10); - } - } on Exception catch (e, st) { - // No idea what happened, but for robustness we'll stop using the zone - // and shout into the logs - _log.shout( - 'Failed to create instance $instanceName, banning zone "$zone" for ' - '15 minutes', - e, - st, + }); + if (rollbackPackageState) { + final oldVersionsMap = updated?.$2 ?? const {}; + // Restore the state of the PackageState for versions that were + // suppose to run on the instance we just failed to create. + // If this doesn't work, we'll eventually retry. Hence, correctness + // does not hinge on this transaction being successful. + await db.tasks.restorePreviousVersionsState( + selected.package, + instanceName, + oldVersionsMap, ); - // Ban usage of zone for 15 minutes - banZone(zone, minutes: 15); - } finally { - if (rollbackPackageState) { - final oldVersionsMap = updated?.$2 ?? const {}; - // Restore the state of the PackageState for versions that were - // suppose to run on the instance we just failed to create. - // If this doesn't work, we'll eventually retry. Hence, correctness - // does not hinge on this transaction being successful. - await db.tasks.restorePreviousVersionsState( - selected.package, - instanceName, - oldVersionsMap, - ); - } } }); } diff --git a/app/test/task/cloudcompute/zone_tracker_test.dart b/app/test/task/cloudcompute/zone_tracker_test.dart new file mode 100644 index 0000000000..e0cf2254a2 --- /dev/null +++ b/app/test/task/cloudcompute/zone_tracker_test.dart @@ -0,0 +1,98 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:clock/clock.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; +import 'package:pub_dev/task/cloudcompute/zone_tracker.dart'; +import 'package:test/test.dart'; + +void main() { + group('ComputeZoneTracker', () { + test('no zones provided', () { + final tracker = ComputeZoneTracker([]); + expect(tracker.hasAvailableZone(), false); + expect(tracker.tryPickZone(), null); + }); + + test('unrelated zone gets banned', () { + final tracker = ComputeZoneTracker(['a']); + expect(tracker.hasAvailableZone(), true); + expect(tracker.tryPickZone(), 'a'); + expect(tracker.tryPickZone(), 'a'); + + tracker.banZone('b', minutes: 2); + expect(tracker.tryPickZone(), 'a'); + }); + + test('single zone gets banned and ban expires', () { + final tracker = ComputeZoneTracker(['a']); + expect(tracker.hasAvailableZone(), true); + expect(tracker.tryPickZone(), 'a'); + expect(tracker.tryPickZone(), 'a'); + + tracker.banZone('a', minutes: 2); + expect(tracker.tryPickZone(), null); + + withClock(Clock.fixed(clock.fromNow(minutes: 3)), () { + expect(tracker.tryPickZone(), 'a'); + }); + }); + + test('round robin with one zone banned', () { + final tracker = ComputeZoneTracker(['a', 'b', 'c']); + expect(tracker.hasAvailableZone(), true); + expect(tracker.tryPickZones(7), ['a', 'b', 'c', 'a', 'b', 'c', 'a']); + + tracker.banZone('b', minutes: 2); + expect(tracker.tryPickZones(5), ['c', 'a', 'c', 'a', 'c']); + + withClock(Clock.fixed(clock.fromNow(minutes: 30)), () { + expect(tracker.tryPickZones(5), ['a', 'b', 'c', 'a', 'b']); + }); + }); + + test('ZoneExhaustedException bans single zone', () async { + final tracker = ComputeZoneTracker(['a', 'b', 'c']); + await tracker.withZoneAndInstance( + 'a', + 'instance-a', + () => throw ZoneExhaustedException('a', 'exhausted'), + ); + expect(tracker.tryPickZones(6), ['b', 'c', 'b', 'c', 'b', 'c']); + + withClock(Clock.fixed(clock.fromNow(minutes: 30)), () { + expect(tracker.tryPickZones(5), ['a', 'b', 'c', 'a', 'b']); + }); + }); + + test('QuotaExhaustedException bans all zones', () async { + final tracker = ComputeZoneTracker(['a', 'b', 'c']); + await tracker.withZoneAndInstance( + 'a', + 'instance-a', + () => throw QuotaExhaustedException('exhausted'), + ); + expect(tracker.hasAvailableZone(), isFalse); + expect(tracker.tryPickZones(2), [null, null]); + + withClock(Clock.fixed(clock.fromNow(minutes: 30)), () { + expect(tracker.tryPickZones(5), ['a', 'b', 'c', 'a', 'b']); + }); + }); + + test('generic Exception bans single zone', () async { + final tracker = ComputeZoneTracker(['a', 'b', 'c']); + await tracker.withZoneAndInstance( + 'a', + 'instance-a', + () => throw Exception('unrelated'), + ); + expect(tracker.tryPickZones(6), ['b', 'c', 'b', 'c', 'b', 'c']); + + withClock(Clock.fixed(clock.fromNow(minutes: 30)), () { + expect(tracker.tryPickZones(5), ['a', 'b', 'c', 'a', 'b']); + }); + }); + }); +} From 58b69aad72280370873910d01f6d55b94f87d457 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 3 Dec 2025 18:10:16 +0100 Subject: [PATCH 2/4] _cursor -> _nextZoneIndex --- app/lib/task/cloudcompute/zone_tracker.dart | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/lib/task/cloudcompute/zone_tracker.dart b/app/lib/task/cloudcompute/zone_tracker.dart index 4f7963ce30..cf6d4c1eb2 100644 --- a/app/lib/task/cloudcompute/zone_tracker.dart +++ b/app/lib/task/cloudcompute/zone_tracker.dart @@ -14,7 +14,7 @@ class ComputeZoneTracker { final List _zones; final _bannedUntil = {}; - int _cursor = 0; + int _nextZoneIndex = 0; ComputeZoneTracker(this._zones); @@ -47,8 +47,8 @@ class ComputeZoneTracker { _pruneBans(); // cursor may be moved at most the number of zones times for (var i = 0; i < _zones.length; i++) { - final zone = _zones[_cursor]; - _cursor = (_cursor + 1) % _zones.length; + final zone = _zones[_nextZoneIndex]; + _nextZoneIndex = (_nextZoneIndex + 1) % _zones.length; if (!_bannedUntil.containsKey(zone)) { return zone; } From 1cdba636342b2e3c7cee89874f9d480495ba8b2a Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 3 Dec 2025 18:12:00 +0100 Subject: [PATCH 3/4] final + moving tryPickZones --- app/lib/task/cloudcompute/zone_tracker.dart | 7 +------ app/test/task/cloudcompute/zone_tracker_test.dart | 6 ++++++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/app/lib/task/cloudcompute/zone_tracker.dart b/app/lib/task/cloudcompute/zone_tracker.dart index cf6d4c1eb2..7a14999773 100644 --- a/app/lib/task/cloudcompute/zone_tracker.dart +++ b/app/lib/task/cloudcompute/zone_tracker.dart @@ -10,7 +10,7 @@ import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; final _log = Logger('pub.task.zone_tracker'); /// Tracks compute zones and custom ban periods. -class ComputeZoneTracker { +final class ComputeZoneTracker { final List _zones; final _bannedUntil = {}; @@ -56,11 +56,6 @@ class ComputeZoneTracker { return null; } - @visibleForTesting - List tryPickZones(int n) { - return List.generate(n, (_) => tryPickZone()); - } - /// Executes [fn] in compute [zone] and handles zone-related exceptions /// with the appropriate bans. Future withZoneAndInstance( diff --git a/app/test/task/cloudcompute/zone_tracker_test.dart b/app/test/task/cloudcompute/zone_tracker_test.dart index e0cf2254a2..610aafa58d 100644 --- a/app/test/task/cloudcompute/zone_tracker_test.dart +++ b/app/test/task/cloudcompute/zone_tracker_test.dart @@ -96,3 +96,9 @@ void main() { }); }); } + +extension on ComputeZoneTracker { + List tryPickZones(int n) { + return List.generate(n, (_) => tryPickZone()); + } +} From 70f94dc734d4798f7b266630bfc557fb21d5a210 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 3 Dec 2025 18:23:21 +0100 Subject: [PATCH 4/4] lints --- app/lib/task/cloudcompute/zone_tracker.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/app/lib/task/cloudcompute/zone_tracker.dart b/app/lib/task/cloudcompute/zone_tracker.dart index 7a14999773..9c3ed400e0 100644 --- a/app/lib/task/cloudcompute/zone_tracker.dart +++ b/app/lib/task/cloudcompute/zone_tracker.dart @@ -4,7 +4,6 @@ import 'package:clock/clock.dart'; import 'package:logging/logging.dart'; -import 'package:meta/meta.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; final _log = Logger('pub.task.zone_tracker');