Skip to content

feat: Support Job-Level Dependency with Async Job Activation#1428

Open
danielhumanmod wants to merge 3 commits intoapache:mainfrom
danielhumanmod:job-level-dependency
Open

feat: Support Job-Level Dependency with Async Job Activation#1428
danielhumanmod wants to merge 3 commits intoapache:mainfrom
danielhumanmod:job-level-dependency

Conversation

@danielhumanmod
Copy link
Contributor

@danielhumanmod danielhumanmod commented Jan 30, 2026

Which issue does this PR close?

Closes #1344 , as the Step 1 - Building the job-level dependency logic.

For the next step, we will apply EXPLAIN ANALYZE job as the first use case.

Rationale for this change

Ballista currently treats a single job as the atomic unit of scheduling, with no mechanism to run multiple jobs sequentially.

To support features like EXPLAIN ANALYZE, we need job-level dependencies. This PR allows the scheduler to split complex plans into dependent jobs, ensuring downstream jobs only trigger after upstream dependencies successfully complete and provide necessary metrics/state.

What changes are included in this PR?

This PR introduces a new ·Pending· status and a dependency resolution mechanism within the TaskManager.

Job submission flow

  1. Client file exeute_query GRPC request
  2. Split jobs into multiple jobs
  3. Submit upstream job into queued_jobs queue
  4. Submit downstream job into pending_jobs queue
  5. On upstream job completion, solve dependency, queue downstream jobs if all dependencies completed
  6. Execute downstream job
image (Green components are the new added ones)

Job lookup flow

image

Are there any user-facing changes?

No

@danielhumanmod danielhumanmod changed the title [WIP] feat: Support Job-Level Dependency with Async Job Activation feat: Support Job-Level Dependency with Async Job Activation Jan 31, 2026
@danielhumanmod danielhumanmod marked this pull request as ready for review January 31, 2026 06:40
@danielhumanmod
Copy link
Contributor Author

To make the review easier, I have not commit the new added tests, will do later.

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool add-on, thanks @danielhumanmod

i just had a quick look, will try to review it asap. One question could we avoid job splitting registry and do splitting in custom query planner if needed ?

/// Registry for job split rules
///
/// Manages a collection of rules and provides methods to apply them to plans.
#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need plan JobSplitRegistry? can this be done as part of custom query planner ?

Copy link
Contributor Author

@danielhumanmod danielhumanmod Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need plan JobSplitRegistry? can this be done as part of custom query planner ?

Good point, I also thought about using that. However, the query planner doesn't seem to fit here for two reasons:

  • 1-to-N Mapping Requirement: The job splitter needs to map 1 LogicalPlan → 2 LogicalPlans (which are then submitted as distinct jobs). But query planner (or optimizer) protocol is designed for a 1:1 mapping

  • Timing: The query planner (or optimizer) logic happens inside submit_job. But we need to split the plan before job submission so that they can submitted separately (e.g., one Queued, one Pending).

Intuitively, the query planner/optimizer works intra-job (optimizing within a single job context), whereas we need something that works inter-job (orchestrating across multiple jobs). I haven't found a better solution yet, but I'd love to hear your thoughts!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is a difference between using optimiser and query planner.

Query planner https://docs.rs/datafusion/latest/datafusion/execution/context/trait.QueryPlanner.html

It could have reference on scheduler and it may decide to split logical plan, produce physical plan for the first job and schedule execution of the second. we would need job info as part of the config passed to create physical plan

Perhaps we could investigate spark handling of similar scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion regarding QueryPlanner. I dug deeper into the suggested approach but found a timing issue can be the blocker:

The Flow:

gRPC (returns Job ID) -> JobQueued Event -> submit_job -> create_physical_plan (Planner)

Still take EXPLAN ANALYZE as example, If we use create_physical_plan to:

  • produce the physical plan for the first job (the Query)
  • submit the second job (the Explain) as pending:

The block will be - job_id is returned to the client immediately after the JobQueued event is posted, before the planner runs.

This means the client is holding the job_id of the First Job (the Query).

Consequently, the client will receive the raw query results (record batches) instead of the expected EXPLAIN output (which is now in the Second Job).

Since the client is already listening to the original job_id by the time the planner runs, we cannot switch the track to the Second Job.

Copy link
Contributor Author

@danielhumanmod danielhumanmod Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark handles this via ExplainCommand on the Driver side. When executing EXPLAIN ANALYZE, the Driver synchronously triggers the underlying query execution, blocks until completion to collect runtime metrics, and then return to client. But we mainly handle it on scheduler side (async + event-driven), which means we probably can not perform a synchronous blocking call like Spark driver

Copy link
Contributor

@milenkovicm milenkovicm Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't only focus on EXPLAIN as others scenarios such as cache support may depend on this
edit: ignore for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark handles this via ExplainCommand on the Driver side. When executing EXPLAIN ANALYZE, the Driver synchronously triggers the underlying query execution, blocks until completion to collect runtime metrics, and then return to client. But we mainly handle it on scheduler side (async + event-driven), which means we probably can not perform a synchronous blocking call like Spark driver

i'm just thinking, maybe we could do the same thing, we already have custom planner on the client side, we can add EXPLAIN ANALYZE handling, so when the job finishes it can fetch job info from the rest interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, let me do an experiment on it's implementation.
At the same time, could you share your concerns about the job splitter? Is it that it feels too heavy? Would it be more acceptable if it were just a helper function, or is the additional transform layer itself what worries you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do @danielhumanmod just give me some time

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @danielhumanmod, first of all sorry for late review.

I'm a bit puzzled which approach would be the best approach for this.

I'm not sure if just splitting jobs by logical plan will be sufficient. One simple case, if we just have job split registry how can we "communicate" outcome of first job to second job?

for example if job-a generates set of exchange files how can we share location of them to dependant job (job-b) if logical plan has already been created?

Could we have some kind of callback mechanism which could be used to describe dependency information and share job information:

let job_a = job_a.and_then(|job_info| {
 job_b_definition(job_info)
}

submit (job_a)

on success of job_a should invoke and_then and generate definition for job_b. this way job_b logical plan could be something which cant be derived with composite logical plan of job_a + job_b as it's currently case with overarching logical plan

or we could have multi job dependencies

let job_a = job_a.and_then(|job_info| {
 job_b_definition(job_info).and_then(|...| {...}
}

submit (job_a)

open for suggestions.
and thanks a lot for taking the time to drive this,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support EXPLAIN ANALYZE in Ballista

2 participants