Skip to content

Commit a379a41

Browse files
Merge branch 'main' into remote-persistent-safe
2 parents b3e4746 + 0cd70ee commit a379a41

16 files changed

+278
-51
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ version = "0.7.6"
1010
nativelink-error = { path = "../nativelink-error" }
1111

1212
byte-unit = { version = "5.1.6", default-features = false, features = ["byte"] }
13-
humantime = { version = "2.2.0", default-features = false }
13+
humantime = { version = "2.3.0", default-features = false }
1414
rand = { version = "0.9.0", default-features = false, features = [
1515
"thread_rng",
1616
] }

nativelink-scheduler/src/awaited_action_db/awaited_action.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl AwaitedAction {
107107
// client_operation_id to all clients.
108108
client_operation_id: operation_id.clone(),
109109
action_digest: action_info.unique_qualifier.digest(),
110+
last_transition_timestamp: now,
110111
});
111112

112113
let ctx = Context::current();

nativelink-scheduler/src/cache_lookup_scheduler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::HashMap;
1616
use std::sync::Arc;
17+
use std::time::SystemTime;
1718

1819
use async_trait::async_trait;
1920
use nativelink_error::{Code, Error, ResultExt, make_err};
@@ -267,6 +268,7 @@ impl CacheLookupScheduler {
267268
client_operation_id: OperationId::default(),
268269
stage: ActionStage::CompletedFromCache(action_result),
269270
action_digest: action_info.unique_qualifier.digest(),
271+
last_transition_timestamp: SystemTime::now(),
270272
};
271273

272274
let ctx = Context::current();

nativelink-scheduler/src/simple_scheduler.rs

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::{BTreeSet, HashMap};
1516
use std::sync::Arc;
1617
use std::time::{Instant, SystemTime};
1718

1819
use async_trait::async_trait;
19-
use futures::Future;
20+
use futures::{Future, StreamExt, future};
2021
use nativelink_config::schedulers::SimpleSpec;
2122
use nativelink_error::{Code, Error, ResultExt};
2223
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
@@ -38,8 +39,7 @@ use opentelemetry::context::{Context, FutureExt as OtelFutureExt};
3839
use opentelemetry_semantic_conventions::attribute::ENDUSER_ID;
3940
use tokio::sync::{Notify, mpsc};
4041
use tokio::time::Duration;
41-
use tokio_stream::StreamExt;
42-
use tracing::{error, info_span};
42+
use tracing::{error, info, info_span};
4343

4444
use crate::api_worker_scheduler::ApiWorkerScheduler;
4545
use crate::awaited_action_db::{AwaitedActionDb, CLIENT_KEEPALIVE_DURATION};
@@ -627,8 +627,7 @@ impl SimpleScheduler {
627627
tokio::pin!(task_change_fut);
628628
tokio::pin!(worker_change_fut);
629629
// Wait for either of these futures to be ready.
630-
let state_changed =
631-
futures::future::select(task_change_fut, worker_change_fut);
630+
let state_changed = future::select(task_change_fut, worker_change_fut);
632631
if last_match_successful {
633632
let _ = state_changed.await;
634633
} else {
@@ -638,7 +637,7 @@ impl SimpleScheduler {
638637
// hard loop if there's something wrong inside do_try_match.
639638
let sleep_fut = tokio::time::sleep(Duration::from_millis(100));
640639
tokio::pin!(sleep_fut);
641-
let _ = futures::future::select(state_changed, sleep_fut).await;
640+
let _ = future::select(state_changed, sleep_fut).await;
642641
}
643642

644643
let result = match weak_inner.upgrade() {
@@ -656,6 +655,68 @@ impl SimpleScheduler {
656655

657656
let res = scheduler.do_try_match(full_worker_logging).await;
658657
if full_worker_logging {
658+
let operations_stream = scheduler
659+
.matching_engine_state_manager
660+
.filter_operations(OperationFilter::default())
661+
.await
662+
.err_tip(|| "In action_scheduler getting filter result");
663+
664+
let mut oldest_actions_in_state: HashMap<
665+
String,
666+
BTreeSet<Arc<ActionState>>,
667+
> = HashMap::new();
668+
let max_items = 5;
669+
670+
match operations_stream {
671+
Ok(stream) => {
672+
let actions = stream
673+
.filter_map(|item| async move {
674+
match item.as_ref().as_state().await {
675+
Ok((action_state, _origin_metadata)) => {
676+
Some(action_state)
677+
}
678+
Err(e) => {
679+
error!(
680+
?e,
681+
"Failed to get action state!"
682+
);
683+
None
684+
}
685+
}
686+
})
687+
.collect::<Vec<_>>()
688+
.await;
689+
for action_state in actions.iter() {
690+
let name = action_state.stage.name();
691+
match oldest_actions_in_state.get_mut(&name) {
692+
Some(values) => {
693+
values.insert(action_state.clone());
694+
if values.len() > max_items {
695+
values.pop_first();
696+
}
697+
}
698+
None => {
699+
let mut values = BTreeSet::new();
700+
values.insert(action_state.clone());
701+
oldest_actions_in_state
702+
.insert(name, values);
703+
}
704+
};
705+
}
706+
}
707+
Err(e) => {
708+
error!(?e, "Failed to get operations list!");
709+
}
710+
}
711+
712+
for value in oldest_actions_in_state.values() {
713+
let mut items = vec![];
714+
for item in value {
715+
items.push(item.to_string());
716+
}
717+
info!(?items, "Oldest actions in state");
718+
}
719+
659720
worker_match_logging_last.replace(now);
660721
}
661722
res

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,8 @@ where
320320
// Note: The caller must filter `client_operation_id`.
321321

322322
let mut maybe_reloaded_awaited_action: Option<AwaitedAction> = None;
323-
if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout
324-
< (self.now_fn)().now()
325-
{
323+
let now = (self.now_fn)().now();
324+
if awaited_action.last_client_keepalive_timestamp() + self.client_action_timeout < now {
326325
// This may change if the version is out of date.
327326
let mut timed_out = true;
328327
if !awaited_action.state().stage.is_finished() {
@@ -335,6 +334,7 @@ where
335334
)),
336335
..ActionResult::default()
337336
});
337+
state.last_transition_timestamp = now;
338338
let state = Arc::new(state);
339339
// We may be competing with an client timestamp update, so try
340340
// this a few times.
@@ -655,6 +655,7 @@ where
655655
// correct client id.
656656
client_operation_id: operation_id.clone(),
657657
action_digest: awaited_action.action_info().digest(),
658+
last_transition_timestamp: now,
658659
}),
659660
now,
660661
);

nativelink-scheduler/tests/action_messages_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async fn action_state_any_url_test() -> Result<(), Error> {
4343
// Result is only populated if has_action_result.
4444
stage: ActionStage::Completed(ActionResult::default()),
4545
action_digest,
46+
last_transition_timestamp: SystemTime::now(),
4647
};
4748
let operation: Operation = action_state.as_operation(client_id);
4849

nativelink-scheduler/tests/cache_lookup_scheduler_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16-
use std::time::UNIX_EPOCH;
16+
use std::time::{SystemTime, UNIX_EPOCH};
1717

1818
mod utils {
1919
pub(crate) mod scheduler_utils;
@@ -71,6 +71,7 @@ async fn add_action_handles_skip_cache() -> Result<(), Error> {
7171
client_operation_id: OperationId::default(),
7272
stage: ActionStage::Queued,
7373
action_digest: action_info.unique_qualifier.digest(),
74+
last_transition_timestamp: SystemTime::now(),
7475
}));
7576
let ActionUniqueQualifier::Cacheable(action_key) = action_info.unique_qualifier.clone() else {
7677
panic!("This test should be testing when item was cached first");

nativelink-scheduler/tests/property_modifier_scheduler_test.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use std::collections::HashMap;
1616
use std::sync::Arc;
17-
use std::time::UNIX_EPOCH;
17+
use std::time::{SystemTime, UNIX_EPOCH};
1818

1919
mod utils {
2020
pub(crate) mod scheduler_utils;
@@ -70,6 +70,7 @@ async fn add_action_adds_property() -> Result<(), Error> {
7070
client_operation_id: OperationId::default(),
7171
stage: ActionStage::Queued,
7272
action_digest: action_info.unique_qualifier.digest(),
73+
last_transition_timestamp: SystemTime::now(),
7374
}));
7475
let client_operation_id = OperationId::default();
7576
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -114,6 +115,7 @@ async fn add_action_overwrites_property() -> Result<(), Error> {
114115
client_operation_id: OperationId::default(),
115116
stage: ActionStage::Queued,
116117
action_digest: action_info.unique_qualifier.digest(),
118+
last_transition_timestamp: SystemTime::now(),
117119
}));
118120
let client_operation_id = OperationId::default();
119121
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -153,6 +155,7 @@ async fn add_action_property_added_after_remove() -> Result<(), Error> {
153155
client_operation_id: OperationId::default(),
154156
stage: ActionStage::Queued,
155157
action_digest: action_info.unique_qualifier.digest(),
158+
last_transition_timestamp: SystemTime::now(),
156159
}));
157160
let client_operation_id = OperationId::default();
158161
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -192,6 +195,7 @@ async fn add_action_property_remove_after_add() -> Result<(), Error> {
192195
client_operation_id: OperationId::default(),
193196
stage: ActionStage::Queued,
194197
action_digest: action_info.unique_qualifier.digest(),
198+
last_transition_timestamp: SystemTime::now(),
195199
}));
196200
let client_operation_id = OperationId::default();
197201
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -233,6 +237,7 @@ async fn add_action_property_replace() -> Result<(), Error> {
233237
client_operation_id: OperationId::default(),
234238
stage: ActionStage::Queued,
235239
action_digest: action_info.unique_qualifier.digest(),
240+
last_transition_timestamp: SystemTime::now(),
236241
}));
237242
let client_operation_id = OperationId::default();
238243
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -277,6 +282,7 @@ async fn add_action_property_replace_match_value() -> Result<(), Error> {
277282
client_operation_id: OperationId::default(),
278283
stage: ActionStage::Queued,
279284
action_digest: action_info.unique_qualifier.digest(),
285+
last_transition_timestamp: SystemTime::now(),
280286
}));
281287
let client_operation_id = OperationId::default();
282288
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -322,6 +328,7 @@ async fn add_action_property_replace_value() -> Result<(), Error> {
322328
client_operation_id: OperationId::default(),
323329
stage: ActionStage::Queued,
324330
action_digest: action_info.unique_qualifier.digest(),
331+
last_transition_timestamp: SystemTime::now(),
325332
}));
326333
let client_operation_id = OperationId::default();
327334
let (_, (passed_client_operation_id, action_info)) = join!(
@@ -359,6 +366,7 @@ async fn add_action_property_remove() -> Result<(), Error> {
359366
client_operation_id: OperationId::default(),
360367
stage: ActionStage::Queued,
361368
action_digest: action_info.unique_qualifier.digest(),
369+
last_transition_timestamp: SystemTime::now(),
362370
}));
363371
// let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::new()));
364372
let client_operation_id = OperationId::default();

nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ async fn add_action_smoke_test() -> Result<(), Error> {
489489
let mut new_awaited_action = worker_awaited_action.clone();
490490
let mut new_state = new_awaited_action.state().as_ref().clone();
491491
new_state.stage = ActionStage::Executing;
492+
new_state.last_transition_timestamp = SystemTime::now();
492493
new_awaited_action.worker_set_state(Arc::new(new_state), MockSystemTime::now().into());
493494
new_awaited_action
494495
};

0 commit comments

Comments
 (0)