diff --git a/proto/meta.proto b/proto/meta.proto index d40e398ce6f24..8e38f5c1ed987 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -264,6 +264,7 @@ message FragmentDistribution { uint32 parallelism = 7; uint32 vnode_count = 8; stream_plan.StreamNode node = 9; + string parallelism_policy = 10; } message ListFragmentDistributionResponse { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_parallelism.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_parallelism.rs index 097c22ad50db0..09d9640888a3e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_parallelism.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragment_parallelism.rs @@ -37,7 +37,8 @@ use risingwave_frontend_macro::system_catalog; f.upstream_fragment_ids, f.flags, f.parallelism, - f.max_parallelism + f.max_parallelism, + f.parallelism_policy FROM all_streaming_jobs job INNER JOIN rw_fragments f ON job.id = f.table_id ORDER BY job.id" @@ -54,4 +55,5 @@ struct RwFragmentParallelism { flags: Vec, parallelism: i32, max_parallelism: i32, + parallelism_policy: String, } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index 186a212270866..f26e694a26670 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -31,6 +31,7 @@ struct RwFragment { flags: Vec, parallelism: i32, max_parallelism: i32, + parallelism_policy: String, node: JsonbVal, } @@ -74,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result MetaResult)>> { let inner = self.inner.read().await; - let fragment_model_opt = FragmentModel::find_by_id(fragment_id) + let fragment_model = match FragmentModel::find_by_id(fragment_id) .one(&inner.db) - .await?; - - let fragment = fragment_model_opt.map(|fragment| { - let info = self.env.shared_actor_infos().read_guard(); - - let SharedFragmentInfo { actors, .. } = info - .get_fragment(fragment.fragment_id as _) - .unwrap_or_else(|| { - panic!( - "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info", - fragment.fragment_id, - fragment.job_id - ) - }); - - FragmentDesc { - fragment_id: fragment.fragment_id, - job_id: fragment.job_id, - fragment_type_mask: fragment.fragment_type_mask, - distribution_type: fragment.distribution_type, - state_table_ids: fragment.state_table_ids.clone(), - vnode_count: fragment.vnode_count, - stream_node: fragment.stream_node.clone(), - parallelism: actors.len() as _, - } - }); - - let Some(fragment) = fragment else { - return Ok(None); // Fragment not found + .await? + { + Some(fragment) => fragment, + None => return Ok(None), }; - // Get upstream fragments - let upstreams: Vec = FragmentRelation::find() + let job_parallelism: Option = + StreamingJob::find_by_id(fragment_model.job_id) + .select_only() + .column(streaming_job::Column::Parallelism) + .into_tuple() + .one(&inner.db) + .await?; + + let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find() .select_only() - .column(fragment_relation::Column::SourceFragmentId) + .columns([ + fragment_relation::Column::SourceFragmentId, + fragment_relation::Column::DispatcherType, + ]) .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id)) .into_tuple() .all(&inner.db) .await?; + let upstreams: Vec<_> = upstream_entries + .into_iter() + .map(|(source_id, _)| source_id) + .collect(); + + let root_fragment_map = find_fragment_no_shuffle_dags_detailed(&inner.db, &[fragment_id]) + .await + .map(Self::collect_root_fragment_mapping)?; + let root_fragments = root_fragment_map + .get(&fragment_id) + .cloned() + .unwrap_or_default(); + + let info = self.env.shared_actor_infos().read_guard(); + let SharedFragmentInfo { actors, .. } = info + .get_fragment(fragment_model.fragment_id as _) + .unwrap_or_else(|| { + panic!( + "Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info", + fragment_model.fragment_id, + fragment_model.job_id + ) + }); + + let parallelism_policy = Self::format_fragment_parallelism_policy( + &fragment_model, + job_parallelism.as_ref(), + &root_fragments, + ); + + let fragment = FragmentDesc { + fragment_id: fragment_model.fragment_id, + job_id: fragment_model.job_id, + fragment_type_mask: fragment_model.fragment_type_mask, + distribution_type: fragment_model.distribution_type, + state_table_ids: fragment_model.state_table_ids.clone(), + parallelism: actors.len() as _, + vnode_count: fragment_model.vnode_count, + stream_node: fragment_model.stream_node.clone(), + parallelism_policy, + }; + Ok(Some((fragment, upstreams))) } @@ -542,7 +570,7 @@ impl CatalogController { .remove(&job_id); Self::compose_table_fragments( - job_id as _, + job_id, job_info.job_status.into(), job_info.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_actors, @@ -925,7 +953,7 @@ impl CatalogController { table_fragments.insert( job.job_id, Self::compose_table_fragments( - job.job_id as _, + job.job_id, job.job_status.into(), job.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_actors, @@ -1064,23 +1092,57 @@ impl CatalogController { let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec(); - let upstreams: Vec<(FragmentId, FragmentId)> = FragmentRelation::find() - .select_only() - .columns([ - fragment_relation::Column::TargetFragmentId, - fragment_relation::Column::SourceFragmentId, - ]) - .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids)) - .into_tuple() - .all(&txn) - .await?; + let job_parallelisms: HashMap = if fragments.is_empty() { + HashMap::new() + } else { + let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec(); + StreamingJob::find() + .select_only() + .columns([ + streaming_job::Column::JobId, + streaming_job::Column::Parallelism, + ]) + .filter(streaming_job::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&txn) + .await? + .into_iter() + .collect() + }; + + let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> = + if fragment_ids.is_empty() { + Vec::new() + } else { + FragmentRelation::find() + .select_only() + .columns([ + fragment_relation::Column::TargetFragmentId, + fragment_relation::Column::SourceFragmentId, + fragment_relation::Column::DispatcherType, + ]) + .filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids.clone())) + .into_tuple() + .all(&txn) + .await? + }; + + let mut all_upstreams: HashMap> = HashMap::new(); + for (target_id, source_id, _) in upstream_entries { + all_upstreams.entry(target_id).or_default().push(source_id); + } + + let root_fragment_map = if fragment_ids.is_empty() { + HashMap::new() + } else { + let ensembles = find_fragment_no_shuffle_dags_detailed(&txn, &fragment_ids).await?; + Self::collect_root_fragment_mapping(ensembles) + }; let guard = self.env.shared_actor_info.read_guard(); let mut result = Vec::new(); - let mut all_upstreams = upstreams.into_iter().into_group_map(); - for fragment_desc in fragments { // note: here sometimes fragment is not found in the cache, fallback to 0 let parallelism = guard @@ -1088,10 +1150,21 @@ impl CatalogController { .map(|fragment| fragment.actors.len()) .unwrap_or_default(); + let root_fragments = root_fragment_map + .get(&fragment_desc.fragment_id) + .cloned() + .unwrap_or_default(); + let upstreams = all_upstreams .remove(&fragment_desc.fragment_id) .unwrap_or_default(); + let parallelism_policy = Self::format_fragment_parallelism_policy( + &fragment_desc, + job_parallelisms.get(&fragment_desc.job_id), + &root_fragments, + ); + let fragment = FragmentDistribution { fragment_id: fragment_desc.fragment_id as _, table_id: fragment_desc.job_id.as_raw_id(), @@ -1103,6 +1176,7 @@ impl CatalogController { parallelism: parallelism as _, vnode_count: fragment_desc.vnode_count as _, node: Some(fragment_desc.stream_node.to_protobuf()), + parallelism_policy, }; result.push((fragment, upstreams)); @@ -1110,6 +1184,73 @@ impl CatalogController { Ok(result) } + /// Build a fragment-to-root lookup for all reported root fragment ensembles. + fn collect_root_fragment_mapping( + ensembles: Vec, + ) -> HashMap> { + let mut mapping = HashMap::new(); + + for ensemble in ensembles { + let mut roots: Vec<_> = ensemble.entry_fragments().collect(); + roots.sort_unstable(); + roots.dedup(); + + if roots.is_empty() { + continue; + } + + let root_set: HashSet<_> = roots.iter().copied().collect(); + + for fragment_id in ensemble.component_fragments() { + if root_set.contains(&fragment_id) { + mapping.insert(fragment_id, Vec::new()); + } else { + mapping.insert(fragment_id, roots.clone()); + } + } + } + + mapping + } + + fn format_fragment_parallelism_policy( + fragment: &fragment::Model, + job_parallelism: Option<&StreamingParallelism>, + root_fragments: &[FragmentId], + ) -> String { + if fragment.distribution_type == DistributionType::Single { + return "single".to_owned(); + } + + if let Some(parallelism) = fragment.parallelism.as_ref() { + return format!( + "override({})", + Self::format_streaming_parallelism(parallelism) + ); + } + + if !root_fragments.is_empty() { + let mut upstreams = root_fragments.to_vec(); + upstreams.sort_unstable(); + upstreams.dedup(); + + return format!("upstream_fragment({upstreams:?})"); + } + + let inherited = job_parallelism + .map(Self::format_streaming_parallelism) + .unwrap_or_else(|| "unknown".to_owned()); + format!("inherit({inherited})") + } + + fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String { + match parallelism { + StreamingParallelism::Adaptive => "adaptive".to_owned(), + StreamingParallelism::Fixed(n) => format!("fixed({n})"), + StreamingParallelism::Custom => "custom".to_owned(), + } + } + pub async fn list_sink_actor_mapping( &self, ) -> MetaResult)>> { @@ -1673,8 +1814,8 @@ mod tests { use risingwave_common::util::stream_graph_visitor::visit_stream_node_body; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::{ - ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, StreamNode, TableId, - VnodeBitmap, fragment, + ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, StreamNode, + StreamingParallelism, TableId, VnodeBitmap, fragment, }; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::plan_common::PbExprContext; @@ -1965,4 +2106,54 @@ mod tests { ); assert_eq!(fragment.stream_node.to_protobuf(), nodes); } + + #[test] + fn test_parallelism_policy_with_root_fragments() { + #[expect(deprecated)] + let fragment = fragment::Model { + fragment_id: 3, + job_id: TEST_JOB_ID, + fragment_type_mask: 0, + distribution_type: DistributionType::Hash, + stream_node: StreamNode::from(&PbStreamNode::default()), + state_table_ids: I32Array::default(), + upstream_fragment_id: Default::default(), + vnode_count: 0, + parallelism: None, + }; + + let job_parallelism = StreamingParallelism::Fixed(4); + + let policy = super::CatalogController::format_fragment_parallelism_policy( + &fragment, + Some(&job_parallelism), + &[], + ); + + assert_eq!(policy, "inherit(fixed(4))"); + } + + #[test] + fn test_parallelism_policy_with_upstream_roots() { + #[expect(deprecated)] + let fragment = fragment::Model { + fragment_id: 5, + job_id: TEST_JOB_ID, + fragment_type_mask: 0, + distribution_type: DistributionType::Hash, + stream_node: StreamNode::from(&PbStreamNode::default()), + state_table_ids: I32Array::default(), + upstream_fragment_id: Default::default(), + vnode_count: 0, + parallelism: None, + }; + + let policy = super::CatalogController::format_fragment_parallelism_policy( + &fragment, + None, + &[3, 1, 2, 1], + ); + + assert_eq!(policy, "upstream_fragment([1, 2, 3])"); + } } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 21e9d540ffc85..363fe2deffc4f 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -290,6 +290,7 @@ pub struct FragmentDesc { pub parallelism: i64, pub vnode_count: i32, pub stream_node: StreamNode, + pub parallelism_policy: String, } /// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies. diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 2f3dbdb8234b2..25ee0f76c9568 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1013,7 +1013,7 @@ impl DdlController { let (aborted, _) = self .metadata_manager .catalog_controller - .try_abort_creating_streaming_job(job_id as _, false) + .try_abort_creating_streaming_job(job_id, false) .await?; if aborted { tracing::warn!(id = %job_id, "aborted streaming job");