Skip to content

Commit eafed78

Browse files
committed
move pruning function for job lineage summary
1 parent 5fbad84 commit eafed78

File tree

2 files changed

+110
-121
lines changed

2 files changed

+110
-121
lines changed

core/scheduler/job_lineage.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package scheduler
22

33
import (
4+
"sort"
45
"time"
56

67
"github.com/goto/optimus/core/tenant"
78
"github.com/goto/optimus/internal/lib/window"
89
)
910

11+
const (
12+
// MaxLineageDepth is a safeguard to avoid infinite recursion in case of unexpected cycles
13+
MaxLineageDepth = 20
14+
)
15+
1016
type JobSchedule struct {
1117
JobName JobName
1218
ScheduledAt time.Time
@@ -25,6 +31,109 @@ type JobLineageSummary struct {
2531
JobRuns map[string]*JobRunSummary
2632
}
2733

34+
type upstreamCandidate struct {
35+
JobName JobName
36+
EndTime time.Time
37+
}
38+
39+
func (j *JobLineageSummary) PruneLineage(maxUpstreamsPerLevel int) *JobLineageSummary {
40+
type nodeInfo struct {
41+
original *JobLineageSummary
42+
pruned *JobLineageSummary
43+
depth int
44+
}
45+
46+
rootPruned := &JobLineageSummary{
47+
JobName: j.JobName,
48+
Tenant: j.Tenant,
49+
Window: j.Window,
50+
ScheduleInterval: j.ScheduleInterval,
51+
SLA: j.SLA,
52+
JobRuns: j.JobRuns,
53+
}
54+
55+
queue := []*nodeInfo{{
56+
original: j,
57+
pruned: rootPruned,
58+
depth: 0,
59+
}}
60+
61+
processed := make(map[JobName]*JobLineageSummary)
62+
processed[j.JobName] = queue[0].pruned
63+
64+
for len(queue) > 0 {
65+
current := queue[0]
66+
queue = queue[1:]
67+
68+
if current.depth >= MaxLineageDepth {
69+
continue
70+
}
71+
72+
upstreams := current.original.Upstreams
73+
candidates := extractUpstreamCandidatesSortedByDuration(current.original)
74+
75+
for i := 0; i < maxUpstreamsPerLevel && i < len(candidates); i++ {
76+
targetJobName := candidates[i].JobName
77+
for _, upstream := range upstreams {
78+
if upstream.JobName == targetJobName {
79+
if existingPruned, exists := processed[upstream.JobName]; exists {
80+
current.pruned.Upstreams = append(current.pruned.Upstreams, existingPruned)
81+
} else {
82+
prunedUpstream := &JobLineageSummary{
83+
JobName: upstream.JobName,
84+
Tenant: upstream.Tenant,
85+
Window: upstream.Window,
86+
ScheduleInterval: upstream.ScheduleInterval,
87+
SLA: upstream.SLA,
88+
JobRuns: upstream.JobRuns,
89+
}
90+
current.pruned.Upstreams = append(current.pruned.Upstreams, prunedUpstream)
91+
processed[upstream.JobName] = prunedUpstream
92+
queue = append(queue, &nodeInfo{
93+
original: upstream,
94+
pruned: prunedUpstream,
95+
depth: current.depth + 1,
96+
})
97+
}
98+
break
99+
}
100+
}
101+
}
102+
}
103+
104+
return rootPruned
105+
}
106+
107+
func extractUpstreamCandidatesSortedByDuration(lineage *JobLineageSummary) []upstreamCandidate {
108+
candidates := []upstreamCandidate{}
109+
110+
for _, upstream := range lineage.Upstreams {
111+
latestFinishTime := getLatestFinishTime(upstream.JobRuns)
112+
candidates = append(candidates, upstreamCandidate{
113+
JobName: upstream.JobName,
114+
EndTime: latestFinishTime,
115+
})
116+
}
117+
118+
sort.Slice(candidates, func(i, j int) bool {
119+
return candidates[i].EndTime.After(candidates[j].EndTime)
120+
})
121+
122+
return candidates
123+
}
124+
125+
func getLatestFinishTime(jobRuns map[string]*JobRunSummary) time.Time {
126+
var latestFinishTime time.Time
127+
128+
for _, jobRun := range jobRuns {
129+
if jobRun.JobEndTime != nil && (latestFinishTime.IsZero() || jobRun.JobEndTime.After(latestFinishTime)) {
130+
latestFinishTime = *jobRun.JobEndTime
131+
}
132+
}
133+
134+
return latestFinishTime
135+
}
136+
28137
func (j *JobLineageSummary) Flatten() []*JobExecutionSummary {
29138
var result []*JobExecutionSummary
30139
queue := []*JobLineageSummary{j}

core/scheduler/resolver/lineage_resolver.go

Lines changed: 1 addition & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package resolver
22

33
import (
44
"context"
5-
"sort"
65
"time"
76

87
"github.com/goto/salt/log"
@@ -12,11 +11,6 @@ import (
1211
"github.com/goto/optimus/internal/lib/window"
1312
)
1413

15-
const (
16-
// MaxLineageDepth is a safeguard to avoid infinite recursion in case of unexpected cycles
17-
MaxLineageDepth = 20
18-
)
19-
2014
type JobUpstreamRepository interface {
2115
GetAllResolvedUpstreams(context.Context) (map[scheduler.JobName][]scheduler.JobName, error)
2216
}
@@ -156,85 +150,12 @@ func (r *LineageResolver) buildSingleJobLineage(ctx context.Context, schedule *s
156150
}
157151

158152
if maxUpstreamsPerLevel > 0 {
159-
finalLineage = r.pruneLineage(finalLineage, maxUpstreamsPerLevel, 0)
153+
finalLineage = finalLineage.PruneLineage(maxUpstreamsPerLevel)
160154
}
161155

162156
return finalLineage, nil
163157
}
164158

165-
type upstreamCandidate struct {
166-
JobName scheduler.JobName
167-
EndTime time.Time
168-
}
169-
170-
func (r *LineageResolver) pruneLineage(lineage *scheduler.JobLineageSummary, maxUpstreamsPerLevel, depth int) *scheduler.JobLineageSummary {
171-
type nodeInfo struct {
172-
original *scheduler.JobLineageSummary
173-
pruned *scheduler.JobLineageSummary
174-
depth int
175-
}
176-
177-
rootPruned := &scheduler.JobLineageSummary{
178-
JobName: lineage.JobName,
179-
Tenant: lineage.Tenant,
180-
Window: lineage.Window,
181-
ScheduleInterval: lineage.ScheduleInterval,
182-
SLA: lineage.SLA,
183-
JobRuns: lineage.JobRuns,
184-
}
185-
186-
queue := []*nodeInfo{{
187-
original: lineage,
188-
pruned: rootPruned,
189-
depth: 0,
190-
}}
191-
192-
processed := make(map[scheduler.JobName]*scheduler.JobLineageSummary)
193-
processed[lineage.JobName] = queue[0].pruned
194-
195-
for len(queue) > 0 {
196-
current := queue[0]
197-
queue = queue[1:]
198-
199-
if current.depth >= MaxLineageDepth {
200-
continue
201-
}
202-
203-
upstreams := current.original.Upstreams
204-
candidates := extractUpstreamCandidatesSortedByDuration(current.original)
205-
206-
for i := 0; i < maxUpstreamsPerLevel && i < len(candidates); i++ {
207-
targetJobName := candidates[i].JobName
208-
for _, upstream := range upstreams {
209-
if upstream.JobName == targetJobName {
210-
if existingPruned, exists := processed[upstream.JobName]; exists {
211-
current.pruned.Upstreams = append(current.pruned.Upstreams, existingPruned)
212-
} else {
213-
prunedUpstream := &scheduler.JobLineageSummary{
214-
JobName: upstream.JobName,
215-
Tenant: upstream.Tenant,
216-
Window: upstream.Window,
217-
ScheduleInterval: upstream.ScheduleInterval,
218-
SLA: upstream.SLA,
219-
JobRuns: upstream.JobRuns,
220-
}
221-
current.pruned.Upstreams = append(current.pruned.Upstreams, prunedUpstream)
222-
processed[upstream.JobName] = prunedUpstream
223-
queue = append(queue, &nodeInfo{
224-
original: upstream,
225-
pruned: prunedUpstream,
226-
depth: current.depth + 1,
227-
})
228-
}
229-
break
230-
}
231-
}
232-
}
233-
}
234-
235-
return rootPruned
236-
}
237-
238159
func (r *LineageResolver) buildLineageTree(jobName scheduler.JobName, lineageData *LineageData, result map[scheduler.JobName]*scheduler.JobLineageSummary, depth int) *scheduler.JobLineageSummary {
239160
if _, ok := result[jobName]; ok {
240161
return result[jobName]
@@ -443,47 +364,6 @@ func (r *LineageResolver) populateLineageWithJobRuns(lineage *scheduler.JobLinea
443364
return result[lineage.JobName]
444365
}
445366

446-
func extractUpstreamCandidatesSortedByDuration(lineage *scheduler.JobLineageSummary) []upstreamCandidate {
447-
candidates := []upstreamCandidate{}
448-
449-
for _, upstream := range lineage.Upstreams {
450-
latestFinishTime := getLatestFinishTime(upstream.JobRuns)
451-
candidates = append(candidates, upstreamCandidate{
452-
JobName: upstream.JobName,
453-
EndTime: latestFinishTime,
454-
})
455-
}
456-
457-
sort.Slice(candidates, func(i, j int) bool {
458-
return candidates[i].EndTime.After(candidates[j].EndTime)
459-
})
460-
461-
return candidates
462-
}
463-
464-
func getLatestFinishTime(jobRuns map[string]*scheduler.JobRunSummary) time.Time {
465-
var latestFinishTime time.Time
466-
467-
for _, jobRun := range jobRuns {
468-
if jobRun.JobEndTime != nil && (latestFinishTime.IsZero() || jobRun.JobEndTime.After(latestFinishTime)) {
469-
latestFinishTime = *jobRun.JobEndTime
470-
}
471-
}
472-
473-
return latestFinishTime
474-
}
475-
476-
func copyJobRuns(source map[string]*scheduler.JobRunSummary) map[string]*scheduler.JobRunSummary {
477-
if source == nil {
478-
return make(map[string]*scheduler.JobRunSummary)
479-
}
480-
result := make(map[string]*scheduler.JobRunSummary, len(source))
481-
for key, jobRun := range source {
482-
result[key] = copyJobRun(jobRun)
483-
}
484-
return result
485-
}
486-
487367
func copyJobRun(source *scheduler.JobRunSummary) *scheduler.JobRunSummary {
488368
if source == nil {
489369
return nil

0 commit comments

Comments
 (0)