Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ message FragmentDistribution {
uint32 parallelism = 7;
uint32 vnode_count = 8;
stream_plan.StreamNode node = 9;
string parallelism_policy = 10;
}

message ListFragmentDistributionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct RwFragment {
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
parallelism_policy: String,
node: JsonbVal,
}

Expand Down Expand Up @@ -74,6 +75,7 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism: distribution.vnode_count as i32,
parallelism_policy: distribution.parallelism_policy,
node: json!(distribution.node).into(),
})
.collect())
Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,11 @@ fn fragment_desc_to_distribution(
table_id: fragment_desc.job_id as _,
distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
upstream_fragment_ids: upstreams.into_iter().map(|id| id as _).collect(),
fragment_type_mask: fragment_desc.fragment_type_mask as _,
parallelism: fragment_desc.parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
node: Some(fragment_desc.stream_node.to_protobuf()),
parallelism_policy: fragment_desc.parallelism_policy,
}
}
193 changes: 149 additions & 44 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,48 +505,71 @@ impl CatalogController {
) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
let inner = self.inner.read().await;

let fragment_model_opt = FragmentModel::find_by_id(fragment_id)
let fragment_model = match FragmentModel::find_by_id(fragment_id)
.one(&inner.db)
.await?;

let fragment = fragment_model_opt.map(|fragment| {
let info = self.env.shared_actor_infos().read_guard();

let SharedFragmentInfo { actors, .. } = info
.get_fragment(fragment.fragment_id as _)
.unwrap_or_else(|| {
panic!(
"Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
fragment.fragment_id,
fragment.job_id
)
});

FragmentDesc {
fragment_id: fragment.fragment_id,
job_id: fragment.job_id,
fragment_type_mask: fragment.fragment_type_mask,
distribution_type: fragment.distribution_type,
state_table_ids: fragment.state_table_ids.clone(),
vnode_count: fragment.vnode_count,
stream_node: fragment.stream_node.clone(),
parallelism: actors.len() as _,
}
});

let Some(fragment) = fragment else {
return Ok(None); // Fragment not found
.await?
{
Some(fragment) => fragment,
None => return Ok(None),
};

// Get upstream fragments
let upstreams: Vec<FragmentId> = FragmentRelation::find()
let job_parallelism: Option<StreamingParallelism> =
StreamingJob::find_by_id(fragment_model.job_id)
.select_only()
.column(streaming_job::Column::Parallelism)
.into_tuple()
.one(&inner.db)
.await?;

let upstream_entries: Vec<(FragmentId, DispatcherType)> = FragmentRelation::find()
.select_only()
.column(fragment_relation::Column::SourceFragmentId)
.columns([
fragment_relation::Column::SourceFragmentId,
fragment_relation::Column::DispatcherType,
])
.filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
.into_tuple()
.all(&inner.db)
.await?;

let mut upstreams = Vec::with_capacity(upstream_entries.len());
let mut noshuffle_upstreams = Vec::new();
for (source_id, dispatcher_type) in upstream_entries {
upstreams.push(source_id);
if dispatcher_type == DispatcherType::NoShuffle {
noshuffle_upstreams.push(source_id);
}
}

let info = self.env.shared_actor_infos().read_guard();
let SharedFragmentInfo { actors, .. } = info
.get_fragment(fragment_model.fragment_id as _)
.unwrap_or_else(|| {
panic!(
"Failed to retrieve fragment description: fragment {} (job_id {}) not found in shared actor info",
fragment_model.fragment_id,
fragment_model.job_id
)
});

let parallelism_policy = Self::format_fragment_parallelism_policy(
&fragment_model,
job_parallelism.as_ref(),
&noshuffle_upstreams,
);

let fragment = FragmentDesc {
fragment_id: fragment_model.fragment_id,
job_id: fragment_model.job_id,
fragment_type_mask: fragment_model.fragment_type_mask,
distribution_type: fragment_model.distribution_type,
state_table_ids: fragment_model.state_table_ids.clone(),
parallelism: actors.len() as _,
vnode_count: fragment_model.vnode_count,
stream_node: fragment_model.stream_node.clone(),
parallelism_policy,
};

Ok(Some((fragment, upstreams)))
}

Expand Down Expand Up @@ -1145,22 +1168,56 @@ impl CatalogController {

let fragment_ids = fragments.iter().map(|f| f.fragment_id).collect_vec();

let upstreams: Vec<(FragmentId, FragmentId)> = FragmentRelation::find()
.select_only()
.columns([
fragment_relation::Column::TargetFragmentId,
fragment_relation::Column::SourceFragmentId,
])
.filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids))
.into_tuple()
.all(&txn)
.await?;
let job_parallelisms: HashMap<ObjectId, StreamingParallelism> = if fragments.is_empty() {
HashMap::new()
} else {
let job_ids = fragments.iter().map(|f| f.job_id).unique().collect_vec();
StreamingJob::find()
.select_only()
.columns([
streaming_job::Column::JobId,
streaming_job::Column::Parallelism,
])
.filter(streaming_job::Column::JobId.is_in(job_ids))
.into_tuple()
.all(&txn)
.await?
.into_iter()
.collect()
};

let upstream_entries: Vec<(FragmentId, FragmentId, DispatcherType)> =
if fragment_ids.is_empty() {
Vec::new()
} else {
FragmentRelation::find()
.select_only()
.columns([
fragment_relation::Column::TargetFragmentId,
fragment_relation::Column::SourceFragmentId,
fragment_relation::Column::DispatcherType,
])
.filter(fragment_relation::Column::TargetFragmentId.is_in(fragment_ids))
.into_tuple()
.all(&txn)
.await?
};

let guard = self.env.shared_actor_info.read_guard();

let mut result = Vec::new();

let mut all_upstreams = upstreams.into_iter().into_group_map();
let mut all_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
let mut noshuffle_upstreams: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
for (target_id, source_id, dispatcher_type) in upstream_entries {
all_upstreams.entry(target_id).or_default().push(source_id);
if dispatcher_type == DispatcherType::NoShuffle {
noshuffle_upstreams
.entry(target_id)
.or_default()
.push(source_id);
}
}

for fragment_desc in fragments {
// note: here sometimes fragment is not found in the cache, fallback to 0
Expand All @@ -1172,6 +1229,15 @@ impl CatalogController {
let upstreams = all_upstreams
.remove(&fragment_desc.fragment_id)
.unwrap_or_default();
let noshuffle_upstreams = noshuffle_upstreams
.remove(&fragment_desc.fragment_id)
.unwrap_or_default();

let parallelism_policy = Self::format_fragment_parallelism_policy(
&fragment_desc,
job_parallelisms.get(&fragment_desc.job_id),
&noshuffle_upstreams,
);

let fragment = FragmentDistribution {
fragment_id: fragment_desc.fragment_id as _,
Expand All @@ -1184,13 +1250,52 @@ impl CatalogController {
parallelism: parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
node: Some(fragment_desc.stream_node.to_protobuf()),
parallelism_policy,
};

result.push((fragment, upstreams));
}
Ok(result)
}

fn format_fragment_parallelism_policy(
fragment: &fragment::Model,
job_parallelism: Option<&StreamingParallelism>,
noshuffle_upstreams: &[FragmentId],
) -> String {
if fragment.distribution_type == DistributionType::Single {
return "single".to_owned();
}

if let Some(parallelism) = fragment.parallelism.as_ref() {
return format!(
"override({})",
Self::format_streaming_parallelism(parallelism)
);
}

if !noshuffle_upstreams.is_empty() {
let mut upstreams = noshuffle_upstreams.to_vec();
upstreams.sort_unstable();
upstreams.dedup();

return format!("upstream_fragment({upstreams:?})");
}

let inherited = job_parallelism
.map(Self::format_streaming_parallelism)
.unwrap_or_else(|| "unknown".to_owned());
format!("inherit({inherited})")
}

fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
match parallelism {
StreamingParallelism::Adaptive => "adaptive".to_owned(),
StreamingParallelism::Fixed(n) => format!("fixed({n})"),
StreamingParallelism::Custom => "custom".to_owned(),
}
}

pub async fn list_sink_actor_mapping(
&self,
) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ pub struct FragmentDesc {
pub parallelism: i64,
pub vnode_count: i32,
pub stream_node: StreamNode,
pub parallelism_policy: String,
}

/// List all objects that are using the given one in a cascade way. It runs a recursive CTE to find all the dependencies.
Expand Down