Skip to content

Commit d2ddde9

Browse files
Add groups to jobs (New) (#2077)
* Added some preliminary tests * Some changes to the group functionality * Cleaning up some logs * Some changes for jobs * Added groups and after to after-suspend-jobs * some intermediate changes * Working version of depmgr * Cleaned external deps * Stop ignoring cycle errors * Added tests for depmgr * Added internal cycle test * Added metabox tests * Fixed metabox test errors * Cleaned up depmgr * Added tests for new fields * fixed fstring * Revert "Stop ignoring cycle errors" This reverts commit a3426cf. * Removed raising error with Dependency error * Using old behavior with metabox tests * Added review comments
1 parent e265d7d commit d2ddde9

File tree

7 files changed

+542
-47
lines changed

7 files changed

+542
-47
lines changed

checkbox-ng/plainbox/impl/depmgr.py

Lines changed: 203 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# This file is part of Checkbox.
22
#
3-
# Copyright 2012-2015 Canonical Ltd.
3+
# Copyright 2012-2025 Canonical Ltd.
44
# Written by:
55
# Zygmunt Krynicki <[email protected]>
6+
# Fernando Bravo <[email protected]>
67
#
78
# Checkbox is free software: you can redistribute it and/or modify
89
# it under the terms of the GNU General Public License version 3,
@@ -34,9 +35,19 @@
3435
import enum
3536

3637
from plainbox.i18n import gettext as _
38+
from plainbox.impl.job import JobDefinition
3739

3840
logger = getLogger("plainbox.depmgr")
3941

42+
GROUP_PREFIX = "_group_job_"
43+
44+
try:
45+
removeprefix = str.removeprefix
46+
except AttributeError:
47+
48+
def removeprefix(s, prefix):
49+
return s.split(prefix, maxsplit=1)[1]
50+
4051

4152
class DependencyType(enum.Enum):
4253
"""
@@ -130,7 +141,7 @@ def __hash__(self):
130141
class DependencyCycleError(DependencyError):
131142
"""Exception raised when a cyclic dependency is detected."""
132143

133-
def __init__(self, job_list):
144+
def __init__(self, job_list, groups=None):
134145
"""
135146
Initialize with a list of jobs that form a dependency loop.
136147
@@ -144,6 +155,7 @@ def __init__(self, job_list):
144155
assert len(job_list) > 1
145156
assert job_list[0] is job_list[-1]
146157
self.job_list = job_list
158+
self.groups = groups
147159

148160
@property
149161
def affected_job(self):
@@ -152,7 +164,14 @@ def affected_job(self):
152164
153165
Here it is the job that has a cyclic dependency on itself.
154166
"""
155-
return self.job_list[0]
167+
job = self.job_list[0]
168+
# If we are in a group, we don't know which job is the one that
169+
# is affected by the cycle, so we return the first job in the list.
170+
# This is not ideal but it is the best we can do.
171+
if job.id.startswith(GROUP_PREFIX):
172+
name = removeprefix(job.id, GROUP_PREFIX)
173+
job = self.groups[name].jobs[0]
174+
return job
156175

157176
@property
158177
def affecting_job(self):
@@ -275,6 +294,15 @@ def __repr__(self):
275294
)
276295

277296

297+
class Group(object):
298+
def __init__(self, name, jobs=None, external_deps=None):
299+
self.name = name
300+
self.jobs = [] if jobs is None else list(jobs)
301+
self.external_deps = (
302+
[] if external_deps is None else set(external_deps)
303+
)
304+
305+
278306
class State(enum.Enum):
279307
"""
280308
States for recursive DFS graph visitor.
@@ -320,7 +348,7 @@ def resolve_dependencies(cls, job_list, visit_list=None):
320348
if a duplicate job definition is present
321349
:raises DependencyCycleError:
322350
if a cyclic dependency is present.
323-
:raises DependencyMissingErorr:
351+
:raises DependencyMissingError:
324352
if a required job does not exist.
325353
"""
326354
return cls(job_list)._solve(visit_list)
@@ -339,14 +367,22 @@ def __init__(self, job_list):
339367

340368
# Solution for all the dependencies that may pull in new jobs
341369
self._pull_solution = []
342-
# The computed solution for all dependencies
370+
# Intermediate solution for ordering dependencies
343371
self._order_solution = []
344372

345-
def _clear_state_map(self):
373+
# Create some variables to handle grouping
374+
self._groups = dict()
375+
self._jobs_in_groups = dict()
376+
346377
self._job_state_map = {
347378
job.id: State.NOT_VISITED for job in self._job_list
348379
}
349380

381+
def _clear_state_map(self):
382+
self._job_state_map = {
383+
k: State.NOT_VISITED for k in self._job_state_map.keys()
384+
}
385+
350386
def _solve(self, visit_list=None):
351387
"""
352388
Internal method of DependencySolver.
@@ -373,28 +409,53 @@ def _solve(self, visit_list=None):
373409
self._visit_list = visit_list
374410

375411
# Solve first for pulling dependencies
376-
self._clear_state_map()
377-
for job in self._visit_list:
378-
self._visit(job, pull=True)
412+
pull_solution = self._solve_pull_deps(self._visit_list)
379413

380414
# Create a map of pulled jobs
381-
self._pulled_map = self._get_job_map(self._pull_solution)
415+
self._pulled_map = self._get_job_map(pull_solution)
382416
# Add the before dependencies for the jobs in the map
383-
for job in self._pull_solution:
417+
for job in pull_solution:
384418
job.controller.add_before_deps(
385419
job, self._pulled_map, self._job_map
386420
)
387421

388-
# Solve again for order dependencies, using the pulled jobs as the
389-
# new visit list
390-
self._clear_state_map()
391-
for job in self._pull_solution:
392-
self._visit(job)
422+
# Look for groups in the pulled map
423+
self.create_groups(pull_solution)
424+
425+
# If there are no groups declared in the pulled jobs, solve the
426+
# ordering normally
427+
if not self._groups:
428+
final_solution = self._solve_order_deps(pull_solution)
429+
430+
# If there are any groups, solve for ordering considering them
431+
else:
432+
# Replace the jobs in the pulled map with the group job
433+
replaced_solution = self.replace_jobs_by_groups(pull_solution)
434+
435+
# Solve again for order dependencies
436+
general_solution = self._solve_order_deps(
437+
replaced_solution, group=None
438+
)
439+
440+
# Solve internally for each group
441+
group_solutions = {}
442+
for group in self._groups.values():
443+
# Get the jobs in the group from the map of pulled jobs
444+
name = group.name
445+
446+
group_solutions[name] = self._solve_order_deps(
447+
group.jobs, group=name
448+
)
449+
450+
# Replace the group jobs with the original jobs inside the group
451+
final_solution = self.replace_groups_by_jobs(
452+
general_solution, group_solutions
453+
)
393454

394455
# Perform a sanity check to ensure that no jobs have been added or
395456
# removed from the solution.
396-
pull_jobs = set(self._pull_solution)
397-
order_jobs = set(self._order_solution)
457+
pull_jobs = set(pull_solution)
458+
order_jobs = set(final_solution)
398459
if pull_jobs != order_jobs:
399460
raise ValueError(
400461
"The dependency manager failed ordering the jobs, some jobs "
@@ -403,12 +464,25 @@ def _solve(self, visit_list=None):
403464
"Order solution: {!r}".format(pull_jobs, order_jobs)
404465
)
405466

406-
logger.debug(_("Done solving"))
467+
return final_solution
468+
469+
def _solve_pull_deps(self, visit_list):
470+
self._clear_state_map()
471+
self._pull_solution = []
472+
for job in visit_list:
473+
self._visit(job, pull=True)
474+
475+
return self._pull_solution
476+
477+
def _solve_order_deps(self, visit_list, group=None):
478+
self._clear_state_map()
479+
self._order_solution = []
480+
for job in visit_list:
481+
self._visit(job, group=group)
407482

408-
# Return the final solution
409483
return self._order_solution
410484

411-
def _visit(self, job, trail=None, pull=False):
485+
def _visit(self, job, trail=None, pull=False, group=None):
412486
"""
413487
Internal method of DependencySolver.
414488
@@ -455,9 +529,7 @@ def _visit(self, job, trail=None, pull=False):
455529
# dependencies that are resource, depends or after.
456530
self._pull_visit(job, trail)
457531
else:
458-
# If this is an order operation we only care about the
459-
# dependencies that are depends or before.
460-
self._order_visit(job, trail)
532+
self._order_visit(job, trail, group=group)
461533

462534
elif state == State.VISITED:
463535
# This node is not fully traced yet but has been visited already
@@ -466,7 +538,7 @@ def _visit(self, job, trail=None, pull=False):
466538
# forms a loop
467539
trail = trail[trail.index(job) :]
468540
logger.debug(_("Found dependency cycle: %r"), trail)
469-
raise DependencyCycleError(trail)
541+
raise DependencyCycleError(trail, self._groups)
470542

471543
elif state == State.FINISHED:
472544
# This node has been visited and is fully traced.
@@ -516,15 +588,27 @@ def _pull_visit(self, job, trail=None):
516588

517589
# We've visited (recursively) all dependencies of this node, so we
518590
# can change the state to finished and append it to the solution
519-
logger.debug(_("Appending %r to pull solution"), job)
520591
self._job_state_map[job.id] = State.FINISHED
521592
self._pull_solution.append(job)
522593

523-
def _order_visit(self, job, trail=None):
594+
def _order_visit(self, job, trail=None, group=None):
524595
# We travel through dependencies recursively
525596
for dep_type, job_id in job.controller.get_dependency_set(
526597
job, self._pull_solution
527598
):
599+
# Check if we are ordering a group
600+
if group is None:
601+
# If the dependency is pointing to a job inside a group, we
602+
# replace it with the group job.
603+
if job_id in self._jobs_in_groups:
604+
group_name = self._jobs_in_groups[job_id]
605+
job_id = "{}{}".format(GROUP_PREFIX, group_name)
606+
else:
607+
# If we are in a group, we only care about the dependencies
608+
# inside the group
609+
if self._jobs_in_groups.get(job_id) != group:
610+
continue
611+
528612
try:
529613
# We look up the job only in the map of pulled jobs
530614
next_job = self._pulled_map[job_id]
@@ -541,15 +625,105 @@ def _order_visit(self, job, trail=None):
541625
# Visit the dependency and update the trail
542626
logger.debug(_("Visiting dependency: %r"), next_job)
543627
trail.append(next_job)
544-
self._visit(next_job, trail, pull=False)
628+
self._visit(next_job, trail, pull=False, group=group)
545629
trail.pop()
546630

547631
# We've visited (recursively) all dependencies of this node, so we
548632
# can change the state to finished and append it to the solution
549-
logger.debug(_("Appending %r to order solution"), job)
550633
self._job_state_map[job.id] = State.FINISHED
551634
self._order_solution.append(job)
552635

636+
def create_groups(self, solution):
637+
"""
638+
Create the groups that are used in the list of pulled jobs.
639+
"""
640+
self._groups = {}
641+
self._jobs_in_groups = {}
642+
643+
for job in solution:
644+
# If the job is not in a group, skip it
645+
if not job.group:
646+
continue
647+
# Else, add it to the group dicts
648+
if job.group not in self._groups:
649+
self._groups[job.group] = Group(job.group)
650+
self._groups[job.group].jobs.append(job)
651+
self._jobs_in_groups[job.id] = job.group
652+
653+
for group in self._groups.values():
654+
group.external_deps = self.get_external_dependencies(group)
655+
656+
def get_external_dependencies(self, group):
657+
"""
658+
Get the external dependencies for the groups.
659+
The external dependencies are the dependencies of the jobs inside the
660+
group that don't point to the jobs inside the group
661+
"""
662+
external_deps = set()
663+
for job in group.jobs:
664+
# Get the dependencies for the job
665+
job_id = job.id
666+
deps = job.controller.get_dependency_set(job, self._pulled_map)
667+
# Filter out the dependencies that are not external
668+
for dep_type, job_id in deps:
669+
if self._jobs_in_groups.get(job_id) != group.name:
670+
external_deps.add(job_id)
671+
return external_deps
672+
673+
def replace_jobs_by_groups(self, solution):
674+
"""
675+
Replace the jobs in the pulled map with the group jobs.
676+
"""
677+
added_groups = set()
678+
679+
def replace_iter():
680+
for job in solution:
681+
group_name = self._jobs_in_groups.get(job.id)
682+
683+
# Not in any group: continue
684+
if group_name is None:
685+
yield job
686+
continue
687+
688+
# Already in a group: skip
689+
if group_name in added_groups:
690+
continue
691+
692+
# Create the group job
693+
group_job_id = "{}{}".format(GROUP_PREFIX, group_name)
694+
# Add external dependencies as dependencies of the group job
695+
deps = self._groups[group_name].external_deps
696+
group_job = JobDefinition(
697+
{"id": group_job_id, "after": " ".join(deps)}
698+
)
699+
700+
# Add the group job to the pulled map and job state map
701+
self._pulled_map[group_job_id] = group_job
702+
self._job_state_map[group_job_id] = State.NOT_VISITED
703+
704+
added_groups.add(group_name)
705+
yield group_job
706+
707+
return list(replace_iter())
708+
709+
def replace_groups_by_jobs(self, solution, group_solutions):
710+
"""
711+
Replace the temporary group jobs with the original jobs inside the
712+
group.
713+
"""
714+
715+
def replace_iter():
716+
for job in solution:
717+
if job.id.startswith(GROUP_PREFIX):
718+
# Remove the prefix and get the group name
719+
name = removeprefix(job.id, GROUP_PREFIX)
720+
# Add the jobs from the group
721+
yield from group_solutions.get(name, [job])
722+
else:
723+
yield job
724+
725+
return list(replace_iter())
726+
553727
@staticmethod
554728
def _get_job_map(job_list):
555729
"""

checkbox-ng/plainbox/impl/session/state.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,12 +1178,19 @@ def _add_job_siblings_unit(self, new_job, recompute, via):
11781178
data["flags"] = data["flags"].replace(Suspend.AUTO_FLAG, "")
11791179
data["flags"] = data["flags"].replace(Suspend.MANUAL_FLAG, "")
11801180
data["id"] = "after-suspend-{}".format(new_job.partial_id)
1181+
11811182
data["_summary"] = "{} after suspend (S3)".format(new_job.summary)
11821183
if new_job.depends:
11831184
data["depends"] += " {}".format(new_job.id)
11841185
else:
11851186
data["depends"] = "{}".format(new_job.id)
11861187
data["depends"] += " {}".format(Suspend.AUTO_JOB_ID)
1188+
if new_job.after:
1189+
data["after"] += " {}".format(new_job.id)
1190+
else:
1191+
data["after"] = "{}".format(new_job.id)
1192+
if new_job.group:
1193+
data["group"] = "after-suspend-{}".format(new_job.group)
11871194
self._add_job_unit(
11881195
JobDefinition(
11891196
data,
@@ -1211,6 +1218,12 @@ def _add_job_siblings_unit(self, new_job, recompute, via):
12111218
else:
12121219
data["depends"] = "{}".format(new_job.id)
12131220
data["depends"] += " {}".format(Suspend.MANUAL_JOB_ID)
1221+
if new_job.after:
1222+
data["after"] += " {}".format(new_job.id)
1223+
else:
1224+
data["after"] = "{}".format(new_job.id)
1225+
if new_job.group:
1226+
data["group"] = "after-suspend-{}".format(new_job.group)
12141227
self._add_job_unit(
12151228
JobDefinition(
12161229
data,

0 commit comments

Comments
 (0)