feat: Support Job-Level Dependency with Async Job Activation#1428
feat: Support Job-Level Dependency with Async Job Activation#1428danielhumanmod wants to merge 3 commits intoapache:mainfrom
Conversation
|
To make the review easier, I have not commit the new added tests, will do later. |
milenkovicm
left a comment
There was a problem hiding this comment.
cool add-on, thanks @danielhumanmod
i just had a quick look, will try to review it asap. One question could we avoid job splitting registry and do splitting in custom query planner if needed ?
| /// Registry for job split rules | ||
| /// | ||
| /// Manages a collection of rules and provides methods to apply them to plans. | ||
| #[derive(Clone)] |
There was a problem hiding this comment.
do we need plan JobSplitRegistry? can this be done as part of custom query planner ?
There was a problem hiding this comment.
do we need plan
JobSplitRegistry? can this be done as part of custom query planner ?
Good point, I also thought about using that. However, the query planner doesn't seem to fit here for two reasons:
-
1-to-N Mapping Requirement: The job splitter needs to map 1 LogicalPlan → 2 LogicalPlans (which are then submitted as distinct jobs). But query planner (or optimizer) protocol is designed for a 1:1 mapping
-
Timing: The query planner (or optimizer) logic happens inside
submit_job. But we need to split the plan before job submission so that they can submitted separately (e.g., one Queued, one Pending).
Intuitively, the query planner/optimizer works intra-job (optimizing within a single job context), whereas we need something that works inter-job (orchestrating across multiple jobs). I haven't found a better solution yet, but I'd love to hear your thoughts!
There was a problem hiding this comment.
I believe there is a difference between using optimiser and query planner.
Query planner https://docs.rs/datafusion/latest/datafusion/execution/context/trait.QueryPlanner.html
It could have reference on scheduler and it may decide to split logical plan, produce physical plan for the first job and schedule execution of the second. we would need job info as part of the config passed to create physical plan
Perhaps we could investigate spark handling of similar scenarios?
There was a problem hiding this comment.
Thanks for the suggestion regarding QueryPlanner. I dug deeper into the suggested approach but found a timing issue can be the blocker:
The Flow:
gRPC (returns Job ID) -> JobQueued Event -> submit_job -> create_physical_plan (Planner)
Still take EXPLAN ANALYZE as example, If we use create_physical_plan to:
- produce the physical plan for the first job (the Query)
- submit the second job (the Explain) as pending:
The block will be - job_id is returned to the client immediately after the JobQueued event is posted, before the planner runs.
This means the client is holding the job_id of the First Job (the Query).
Consequently, the client will receive the raw query results (record batches) instead of the expected EXPLAIN output (which is now in the Second Job).
Since the client is already listening to the original job_id by the time the planner runs, we cannot switch the track to the Second Job.
There was a problem hiding this comment.
Spark handles this via ExplainCommand on the Driver side. When executing EXPLAIN ANALYZE, the Driver synchronously triggers the underlying query execution, blocks until completion to collect runtime metrics, and then return to client. But we mainly handle it on scheduler side (async + event-driven), which means we probably can not perform a synchronous blocking call like Spark driver
There was a problem hiding this comment.
Please don't only focus on EXPLAIN as others scenarios such as cache support may depend on this
edit: ignore for now
There was a problem hiding this comment.
Spark handles this via
ExplainCommandon the Driver side. When executing EXPLAIN ANALYZE, the Driver synchronously triggers the underlying query execution, blocks until completion to collect runtime metrics, and then return to client. But we mainly handle it on scheduler side (async + event-driven), which means we probably can not perform a synchronous blocking call like Spark driver
i'm just thinking, maybe we could do the same thing, we already have custom planner on the client side, we can add EXPLAIN ANALYZE handling, so when the job finishes it can fetch job info from the rest interface
There was a problem hiding this comment.
Cool, let me do an experiment on it's implementation.
At the same time, could you share your concerns about the job splitter? Is it that it feels too heavy? Would it be more acceptable if it were just a helper function, or is the additional transform layer itself what worries you?
milenkovicm
left a comment
There was a problem hiding this comment.
thanks @danielhumanmod, first of all sorry for late review.
I'm a bit puzzled which approach would be the best approach for this.
I'm not sure if just splitting jobs by logical plan will be sufficient. One simple case, if we just have job split registry how can we "communicate" outcome of first job to second job?
for example if job-a generates set of exchange files how can we share location of them to dependant job (job-b) if logical plan has already been created?
Could we have some kind of callback mechanism which could be used to describe dependency information and share job information:
let job_a = job_a.and_then(|job_info| {
job_b_definition(job_info)
}
submit (job_a)
on success of job_a should invoke and_then and generate definition for job_b. this way job_b logical plan could be something which cant be derived with composite logical plan of job_a + job_b as it's currently case with overarching logical plan
or we could have multi job dependencies
let job_a = job_a.and_then(|job_info| {
job_b_definition(job_info).and_then(|...| {...}
}
submit (job_a)
open for suggestions.
and thanks a lot for taking the time to drive this,
Which issue does this PR close?
Closes #1344 , as the Step 1 - Building the job-level dependency logic.
For the next step, we will apply
EXPLAIN ANALYZEjob as the first use case.Rationale for this change
Ballista currently treats a single job as the atomic unit of scheduling, with no mechanism to run multiple jobs sequentially.
To support features like
EXPLAIN ANALYZE, we need job-level dependencies. This PR allows the scheduler to split complex plans into dependent jobs, ensuring downstream jobs only trigger after upstream dependencies successfully complete and provide necessary metrics/state.What changes are included in this PR?
This PR introduces a new ·Pending· status and a dependency resolution mechanism within the TaskManager.
Job submission flow
exeute_queryGRPC requestJob lookup flow
Are there any user-facing changes?
No