Skip to content

Commit b78ae56

Browse files
committed
start of work to add jobset
Signed-off-by: vsoch <[email protected]>
1 parent a045141 commit b78ae56

File tree

18 files changed

+573
-1039
lines changed

18 files changed

+573
-1039
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Build the manager binary
2-
FROM golang:1.18 as builder
2+
FROM golang:1.20 as builder
33

44
WORKDIR /workspace
55

controllers/flux/job.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ func (r *MiniClusterReconciler) newMiniClusterJob(
5656
},
5757
Spec: corev1.PodSpec{
5858
// matches the service
59-
Subdomain: restfulServiceName,
60-
SetHostnameAsFQDN: &setAsFQDN,
61-
Volumes: getVolumes(cluster),
59+
Subdomain: restfulServiceName,
60+
SetHostnameAsFQDN: &setAsFQDN,
61+
// Not currently in use, commented out for now
62+
//Volumes: getVolumes(cluster),
6263
RestartPolicy: corev1.RestartPolicyOnFailure,
6364
ImagePullSecrets: getImagePullSecrets(cluster),
6465
ServiceAccountName: cluster.Spec.Pod.ServiceAccountName,

controllers/flux/jobset.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright 2023 Lawrence Livermore National Security, LLC
3+
(c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
5+
This is part of the Flux resource manager framework.
6+
For details, see https://github.com/flux-framework.
7+
8+
SPDX-License-Identifier: Apache-2.0
9+
*/
10+
11+
package controllers
12+
13+
import (
14+
batchv1 "k8s.io/api/batch/v1"
15+
corev1 "k8s.io/api/core/v1"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
18+
api "flux-framework/flux-operator/api/v1alpha1"
19+
20+
jobset "sigs.k8s.io/jobset/api/v1alpha1"
21+
)
22+
23+
func (r *MiniClusterReconciler) newJobSet(
24+
cluster *api.MiniCluster,
25+
) (*jobset.JobSet, error) {
26+
27+
suspend := true
28+
jobs := jobset.JobSet{
29+
ObjectMeta: metav1.ObjectMeta{
30+
Name: cluster.Name,
31+
Namespace: cluster.Namespace,
32+
Labels: cluster.Spec.JobLabels,
33+
},
34+
Spec: jobset.JobSetSpec{
35+
36+
// Suspend child jobs (the worker pods) when broker finishes
37+
Suspend: &suspend,
38+
// TODO decide on FailurePolicy here
39+
// default is to fail if all jobs in jobset fail
40+
},
41+
}
42+
43+
// Get leader broker job, the parent in the JobSet (worker or follower pods)
44+
// cluster, size, entrypoint, indexed
45+
leaderJob, err := r.getJob(cluster, 1, "broker", false)
46+
if err != nil {
47+
return &jobs, err
48+
}
49+
workerJob, err := r.getJob(cluster, cluster.Spec.Size-1, "worker", true)
50+
if err != nil {
51+
return &jobs, err
52+
}
53+
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
54+
return &jobs, nil
55+
}
56+
57+
// getBrokerJob creates the job for the main leader broker
58+
func (r *MiniClusterReconciler) getJob(
59+
cluster *api.MiniCluster,
60+
size int32,
61+
entrypoint string,
62+
indexed bool,
63+
) (jobset.ReplicatedJob, error) {
64+
65+
backoffLimit := int32(100)
66+
podLabels := r.getPodLabels(cluster)
67+
enableDNSHostnames := true
68+
completionMode := batchv1.NonIndexedCompletion
69+
70+
if indexed {
71+
completionMode = batchv1.IndexedCompletion
72+
}
73+
74+
// TODO how are these named
75+
job := jobset.ReplicatedJob{
76+
Name: cluster.Name + "-" + entrypoint,
77+
78+
// Allow pods to be reached by their hostnames! A simple boolean! Chef's kiss!
79+
// <jobSet.name>-<spec.replicatedJob.name>-<job-index>-<pod-index>.<jobSet.name>-<spec.replicatedJob.name>
80+
Network: &jobset.Network{
81+
EnableDNSHostnames: &enableDNSHostnames,
82+
},
83+
84+
Template: batchv1.JobTemplateSpec{
85+
ObjectMeta: metav1.ObjectMeta{
86+
Name: cluster.Name,
87+
Namespace: cluster.Namespace,
88+
Labels: cluster.Spec.JobLabels,
89+
},
90+
},
91+
// This is the default, but let's be explicit
92+
Replicas: 1,
93+
}
94+
95+
// Create the JobSpec for the job -> Template -> Spec
96+
jobspec := batchv1.JobSpec{
97+
BackoffLimit: &backoffLimit,
98+
Completions: &size,
99+
Parallelism: &size,
100+
CompletionMode: &completionMode,
101+
ActiveDeadlineSeconds: &cluster.Spec.DeadlineSeconds,
102+
103+
// Note there is parameter to limit runtime
104+
Template: corev1.PodTemplateSpec{
105+
ObjectMeta: metav1.ObjectMeta{
106+
Name: cluster.Name,
107+
Namespace: cluster.Namespace,
108+
Labels: podLabels,
109+
Annotations: cluster.Spec.Pod.Annotations,
110+
},
111+
Spec: corev1.PodSpec{
112+
// matches the service
113+
// Subdomain: restfulServiceName,
114+
Volumes: getVolumes(cluster, entrypoint),
115+
RestartPolicy: corev1.RestartPolicyOnFailure,
116+
ImagePullSecrets: getImagePullSecrets(cluster),
117+
ServiceAccountName: cluster.Spec.Pod.ServiceAccountName,
118+
NodeSelector: cluster.Spec.Pod.NodeSelector,
119+
},
120+
},
121+
}
122+
// Get resources for the pod
123+
resources, err := r.getPodResources(cluster)
124+
r.log.Info("🌀 MiniCluster", "Pod.Resources", resources)
125+
if err != nil {
126+
r.log.Info("🌀 MiniCluster", "Pod.Resources", resources)
127+
return job, err
128+
}
129+
jobspec.Template.Spec.Overhead = resources
130+
131+
// Get volume mounts, add on container specific ones
132+
mounts := getVolumeMounts(cluster)
133+
containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts)
134+
jobspec.Template.Spec.Containers = containers
135+
job.Template.Spec = jobspec
136+
return job, err
137+
}

0 commit comments

Comments
 (0)