Skip to content

Commit b8632c1

Browse files
Emit GrpcMessageTooLarge as failure_reason for WFT
1 parent aed367e commit b8632c1

File tree

3 files changed

+39
-19
lines changed

3 files changed

+39
-19
lines changed

crates/sdk-core/src/telemetry/metrics.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ pub(crate) enum FailureReason {
642642
Timeout,
643643
NexusOperation(String),
644644
NexusHandlerError(String),
645+
GrpcMessageTooLarge,
645646
}
646647
impl Display for FailureReason {
647648
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -651,6 +652,7 @@ impl Display for FailureReason {
651652
FailureReason::Timeout => "timeout".to_owned(),
652653
FailureReason::NexusOperation(op) => format!("operation_{op}"),
653654
FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"),
655+
FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(),
654656
};
655657
write!(f, "{str}")
656658
}

crates/sdk-core/src/worker/workflow/mod.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,10 @@ pub(crate) use driven_workflow::DrivenWorkflow;
1515
pub(crate) use history_update::HistoryUpdate;
1616

1717
use crate::{
18-
MetricsContext,
19-
abstractions::{
18+
MetricsContext, abstractions::{
2019
MeteredPermitDealer, TrackedOwnedMeteredSemPermit, UsedMeteredSemPermit, dbg_panic,
2120
take_cell::TakeCell,
22-
},
23-
internal_flags::InternalFlags,
24-
pollers::TrackedPermittedTqResp,
25-
protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage},
26-
telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread},
27-
worker::{
21+
}, internal_flags::InternalFlags, pollers::TrackedPermittedTqResp, protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, telemetry::{VecDisplayer, metrics::{self, FailureReason}, set_trace_subscriber_for_current_thread}, worker::{
2822
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
2923
PostActivateHookData,
3024
activities::{ActivitiesFromWFTsHandle, LocalActivityManager},
@@ -37,7 +31,7 @@ use crate::{
3731
wft_poller::validate_wft,
3832
workflow_stream::{LocalInput, LocalInputs, WFStream},
3933
},
40-
},
34+
}
4135
};
4236
use anyhow::anyhow;
4337
use futures_util::{Stream, StreamExt, future::abortable, stream, stream::BoxStream};
@@ -57,19 +51,15 @@ use std::{
5751
};
5852
use temporalio_client::MESSAGE_TOO_LARGE_KEY;
5953
use temporalio_common::{
60-
errors::{CompleteWfError, PollError},
61-
protos::{
54+
errors::{CompleteWfError, PollError}, protos::{
6255
TaskToken,
6356
coresdk::{
6457
workflow_activation::{
6558
QueryWorkflow, WorkflowActivation, WorkflowActivationJob,
6659
remove_from_cache::EvictionReason, workflow_activation_job,
6760
},
6861
workflow_commands::*,
69-
workflow_completion,
70-
workflow_completion::{
71-
Failure, WorkflowActivationCompletion, workflow_activation_completion,
72-
},
62+
workflow_completion::{self, Failure, WorkflowActivationCompletion, workflow_activation_completion},
7363
},
7464
temporal::api::{
7565
command::v1::{Command as ProtoCommand, Command, command::Attributes},
@@ -84,8 +74,7 @@ use temporalio_common::{
8474
taskqueue::v1::StickyExecutionAttributes,
8575
workflowservice::v1::{PollActivityTaskQueueResponse, get_system_info_response},
8676
},
87-
},
88-
worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind},
77+
}, worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind}
8978
};
9079
use tokio::{
9180
sync::{
@@ -134,6 +123,7 @@ pub(crate) struct Workflows {
134123
local_act_mgr: Option<Arc<LocalActivityManager>>,
135124
ever_polled: AtomicBool,
136125
default_versioning_behavior: Option<VersioningBehavior>,
126+
metrics: MetricsContext,
137127
}
138128

139129
pub(crate) struct WorkflowBasics {
@@ -176,6 +166,7 @@ impl Workflows {
176166
let (fetch_tx, fetch_rx) = unbounded_channel();
177167
let shutdown_tok = basics.shutdown_token.clone();
178168
let task_queue = basics.worker_config.task_queue.clone();
169+
let metrics = basics.metrics.clone();
179170
let default_versioning_behavior = basics.default_versioning_behavior;
180171
let extracted_wft_stream = WFTExtractor::build(
181172
client.clone(),
@@ -267,6 +258,7 @@ impl Workflows {
267258
local_act_mgr,
268259
ever_polled: AtomicBool::new(false),
269260
default_versioning_behavior,
261+
metrics,
270262
}
271263
}
272264

@@ -431,6 +423,10 @@ impl Workflows {
431423
);
432424
self.handle_activation_failed(run_id, completion_time, new_outcome)
433425
.await;
426+
self.metrics.with_new_attrs([metrics::failure_reason(
427+
FailureReason::GrpcMessageTooLarge
428+
)])
429+
.wf_task_failed();
434430
return Err(e);
435431
}
436432
e => {

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() {
210210

211211
#[tokio::test]
212212
async fn oversize_grpc_message() {
213+
use crate::common::{prom_metrics, ANY_PORT, NAMESPACE};
213214
let wf_name = "oversize_grpc_message";
214-
let mut starter = CoreWfStarter::new(wf_name);
215+
// Enable Prometheus metrics for this test and capture the address
216+
let (telemopts, addr, _aborter) = prom_metrics(None);
217+
let runtime = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
218+
let mut starter = CoreWfStarter::new_with_runtime(wf_name, runtime);
215219
starter
216220
.worker_config
217221
.task_types(WorkerTaskTypes::workflow_only());
@@ -238,7 +242,25 @@ async fn oversize_grpc_message() {
238242
} else {
239243
false
240244
}
241-
}))
245+
}));
246+
247+
// Verify the workflow task failure metric includes the GrpcMessageTooLarge reason
248+
let tq = starter.get_task_queue();
249+
crate::common::eventually(
250+
|| async {
251+
let body = crate::integ_tests::metrics_tests::get_text(format!("http://{addr}/metrics")).await;
252+
if body.contains(&format!(
253+
"temporal_workflow_task_execution_failed{{failure_reason=\"GrpcMessageTooLarge\",namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",task_queue=\"{tq}\"}} 1"
254+
)) {
255+
Ok(())
256+
} else {
257+
Err(())
258+
}
259+
},
260+
Duration::from_secs(2),
261+
)
262+
.await
263+
.unwrap();
242264
}
243265

244266
#[tokio::test]

0 commit comments

Comments
 (0)