Skip to content

Commit 37c6b37

Browse files
committed
first go of jobset wroking
we were not able to use the jobset networking, but instead fell back to our custom headless service with a job-group selector Signed-off-by: vsoch <[email protected]>
1 parent b78ae56 commit 37c6b37

File tree

16 files changed

+176
-133
lines changed

16 files changed

+176
-133
lines changed

.github/workflows/main.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ jobs:
101101
fi
102102
make deploy-local
103103
minikube image load ghcr.io/flux-framework/flux-operator:test
104+
VERSION=v0.1.3
105+
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
104106
kubectl apply -f examples/dist/flux-operator-local.yaml
105107
106108
- name: Test ${{ matrix.test[0] }}

.github/workflows/test-python.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ jobs:
7171
minikube ssh docker pull ${container}
7272
make deploy-local
7373
minikube image load ghcr.io/flux-framework/flux-operator:test
74+
VERSION=v0.1.3
75+
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
7476
kubectl apply -f examples/dist/flux-operator-local.yaml
7577
7678
- name: Test ${{ matrix.test[0] }}

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ clean:
165165
kubectl delete -n flux-operator pods --all --grace-period=0 --force
166166
kubectl delete -n flux-operator pvc --all --grace-period=0 --force
167167
kubectl delete -n flux-operator pv --all --grace-period=0 --force
168+
kubectl delete -n flux-operator jobset --all --grace-period=0 --force
168169
kubectl delete -n flux-operator jobs --all --grace-period=0 --force
169170
kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force
170171

chart/templates/manager-rbac.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,32 @@ rules:
273273
- patch
274274
- update
275275
- watch
276+
- apiGroups:
277+
- jobset.x-k8s.io
278+
resources:
279+
- jobsets
280+
verbs:
281+
- create
282+
- delete
283+
- get
284+
- list
285+
- patch
286+
- update
287+
- watch
288+
- apiGroups:
289+
- jobset.x-k8s.io
290+
resources:
291+
- jobsets/finalizers
292+
verbs:
293+
- update
294+
- apiGroups:
295+
- jobset.x-k8s.io
296+
resources:
297+
- jobsets/status
298+
verbs:
299+
- get
300+
- patch
301+
- update
276302
- apiGroups:
277303
- networking.k8s.io
278304
resources:

config/rbac/role.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,32 @@ rules:
273273
- patch
274274
- update
275275
- watch
276+
- apiGroups:
277+
- jobset.x-k8s.io
278+
resources:
279+
- jobsets
280+
verbs:
281+
- create
282+
- delete
283+
- get
284+
- list
285+
- patch
286+
- update
287+
- watch
288+
- apiGroups:
289+
- jobset.x-k8s.io
290+
resources:
291+
- jobsets/finalizers
292+
verbs:
293+
- update
294+
- apiGroups:
295+
- jobset.x-k8s.io
296+
resources:
297+
- jobsets/status
298+
verbs:
299+
- get
300+
- patch
301+
- update
276302
- apiGroups:
277303
- networking.k8s.io
278304
resources:

controllers/flux/containers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers(
2323
specs []api.MiniClusterContainer,
2424
defaultName string,
2525
mounts []corev1.VolumeMount,
26+
entrypoint string,
2627
) ([]corev1.Container, error) {
2728

2829
// Create the containers for the pod
@@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers(
4546
if container.RunFlux {
4647

4748
// wait.sh path corresponds to container identifier
48-
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
49+
waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i)
4950
command = []string{"/bin/bash", waitScript, container.Command}
5051
containerName = defaultName
5152
}

controllers/flux/job.go

Lines changed: 0 additions & 100 deletions
This file was deleted.

controllers/flux/jobset.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,38 @@ import (
1717

1818
api "flux-framework/flux-operator/api/v1alpha1"
1919

20+
ctrl "sigs.k8s.io/controller-runtime"
2021
jobset "sigs.k8s.io/jobset/api/v1alpha1"
2122
)
2223

2324
func (r *MiniClusterReconciler) newJobSet(
2425
cluster *api.MiniCluster,
2526
) (*jobset.JobSet, error) {
2627

27-
suspend := true
28+
// I don't really understand how this works, but it seems to be
29+
// not creating any pods? So bad idea?
30+
suspend := false
2831
jobs := jobset.JobSet{
2932
ObjectMeta: metav1.ObjectMeta{
30-
Name: cluster.Name,
33+
Name: "minicluster",
3134
Namespace: cluster.Namespace,
3235
Labels: cluster.Spec.JobLabels,
3336
},
3437
Spec: jobset.JobSetSpec{
3538

3639
// Suspend child jobs (the worker pods) when broker finishes
40+
// How do I define a child job?
3741
Suspend: &suspend,
3842
// TODO decide on FailurePolicy here
3943
// default is to fail if all jobs in jobset fail
4044
},
4145
}
4246

4347
// Get leader broker job, the parent in the JobSet (worker or follower pods)
48+
// Both are required to be in indexed completion mode to have a service!
49+
// I'm not sure that totally makes sense, but ok!
4450
// cluster, size, entrypoint, indexed
45-
leaderJob, err := r.getJob(cluster, 1, "broker", false)
51+
leaderJob, err := r.getJob(cluster, 1, "broker", true)
4652
if err != nil {
4753
return &jobs, err
4854
}
@@ -51,10 +57,11 @@ func (r *MiniClusterReconciler) newJobSet(
5157
return &jobs, err
5258
}
5359
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
60+
ctrl.SetControllerReference(cluster, &jobs, r.Scheme)
5461
return &jobs, nil
5562
}
5663

57-
// getBrokerJob creates the job for the main leader broker
64+
// getJob creates a job for a main leader (broker) or worker (followers)
5865
func (r *MiniClusterReconciler) getJob(
5966
cluster *api.MiniCluster,
6067
size int32,
@@ -64,14 +71,13 @@ func (r *MiniClusterReconciler) getJob(
6471

6572
backoffLimit := int32(100)
6673
podLabels := r.getPodLabels(cluster)
67-
enableDNSHostnames := true
74+
enableDNSHostnames := false
6875
completionMode := batchv1.NonIndexedCompletion
6976

7077
if indexed {
7178
completionMode = batchv1.IndexedCompletion
7279
}
7380

74-
// TODO how are these named
7581
job := jobset.ReplicatedJob{
7682
Name: cluster.Name + "-" + entrypoint,
7783

@@ -110,7 +116,7 @@ func (r *MiniClusterReconciler) getJob(
110116
},
111117
Spec: corev1.PodSpec{
112118
// matches the service
113-
// Subdomain: restfulServiceName,
119+
Subdomain: restfulServiceName,
114120
Volumes: getVolumes(cluster, entrypoint),
115121
RestartPolicy: corev1.RestartPolicyOnFailure,
116122
ImagePullSecrets: getImagePullSecrets(cluster),
@@ -130,7 +136,12 @@ func (r *MiniClusterReconciler) getJob(
130136

131137
// Get volume mounts, add on container specific ones
132138
mounts := getVolumeMounts(cluster)
133-
containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts)
139+
containers, err := r.getContainers(
140+
cluster.Spec.Containers,
141+
cluster.Name,
142+
mounts,
143+
entrypoint,
144+
)
134145
jobspec.Template.Spec.Containers = containers
135146
job.Template.Spec = jobspec
136147
return job, err

controllers/flux/minicluster.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
8989
}
9090

9191
// Create headless service for the MiniCluster
92-
// We should not technically need this anymore.
93-
// TODO I need to test the cluster, but I can't get the JobSet working
94-
//selector := map[string]string{"job-name": cluster.Name}
95-
//result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
96-
//if err != nil {
97-
// return result, err
98-
//}
92+
selector := map[string]string{"job-group": cluster.Name}
93+
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
94+
if err != nil {
95+
return result, err
96+
}
9997

10098
// Create the batch job that brings it all together!
10199
// A batchv1.Job can hold a spec for containers that use the configs we just made
@@ -452,16 +450,19 @@ func (r *MiniClusterReconciler) getConfigMap(
452450
func generateHostlist(cluster *api.MiniCluster, size int) string {
453451

454452
// The hosts are generated through the max size, so the cluster can expand
455-
return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size))
453+
// minicluster-flux-sample-broker-0-0
454+
// minicluster-flux-sample-worker-0-1 through 0-3 for a size 4 cluster
455+
return fmt.Sprintf("minicluster-%s-broker-0-0,minicluster-%s-worker-0-[%s]", cluster.Name, cluster.Name, generateRange(size-1))
456456
}
457457

458458
// generateFluxConfig creates the broker.toml file used to boostrap flux
459459
func generateFluxConfig(cluster *api.MiniCluster) string {
460460

461461
// The hosts are generated through the max size, so the cluster can expand
462+
brokerFqdn := fmt.Sprintf("minicluster-%s-broker-0-0", cluster.Name)
462463
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace)
463-
hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize)))
464-
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, cluster.Name, hosts)
464+
hosts := fmt.Sprintf("%s, minicluster-%s-worker-0-[%s]", brokerFqdn, cluster.Name, generateRange(int(cluster.Spec.MaxSize-1)))
465+
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, hosts)
465466
fluxConfig += "\n" + brokerArchiveSection
466467
return fluxConfig
467468
}

controllers/flux/minicluster_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ func NewMiniClusterReconciler(
8989
//+kubebuilder:rbac:groups=networking.k8s.io,resources="ingresses",verbs=get;list;watch;create;update;patch;delete
9090

9191
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
92+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
93+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
94+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update
9295
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;exec
9396
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;list;watch;create;update;patch;delete;exec
9497

0 commit comments

Comments
 (0)