Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions test/e2e/mpi_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})

ginkgo.It("should not be updated when managed externaly, only created", func() {
ginkgo.It("should not be updated when managed externally, only created", func() {
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
ctx := context.Background()
mpiJob = createJob(ctx, mpiJob)
Expand Down Expand Up @@ -352,7 +352,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
// Set up the scheduler-plugins.
setUpSchedulerPlugins()
// Set up the mpi-operator so that the scheduler-plugins is used as gang-scheduler.
setupMPIOperator(ctx, mpiJob, enableGangSchedulingFlag, unschedulableResources)
setupMPIOperator(ctx, mpiJob, unschedulableResources, enableGangSchedulingFlag)
})

ginkgo.AfterEach(func() {
Expand Down Expand Up @@ -447,7 +447,7 @@ var _ = ginkgo.Describe("MPIJob", func() {
// Set up the volcano-scheduler.
setupVolcanoScheduler()
// Set up the mpi-operator so that the volcano scheduler is used as gang-scheduler.
setupMPIOperator(ctx, mpiJob, enableGangSchedulingFlag, unschedulableResources)
setupMPIOperator(ctx, mpiJob, unschedulableResources, enableGangSchedulingFlag)
})

ginkgo.AfterEach(func() {
Expand Down Expand Up @@ -527,6 +527,61 @@ var _ = ginkgo.Describe("MPIJob", func() {
}, foreverTimeout, waitInterval).Should(gomega.Equal(corev1.ConditionTrue))
})
})

// The custom cluster-domain e2e tests.
ginkgo.Context("with custom cluster-domain", func() {
const (
clusterDomainFlag = "--cluster-domain=cluster.local"
allowRunAsRootOpt = "--allow-run-as-root"
)

var ctx = context.Background()

ginkgo.BeforeEach(func() {
setupMPIOperator(ctx, mpiJob, nil, clusterDomainFlag)
mpiJob.Spec.RunLauncherAsWorker = ptr.To(true)
launcherContainer := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0]
launcherContainer.Command = append(launcherContainer.Command, allowRunAsRootOpt)
})

ginkgo.AfterEach(func() {
operator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
oldOperator := operator.DeepCopy()
gomega.Expect(err).Should(gomega.Succeed())
for i, arg := range operator.Spec.Template.Spec.Containers[0].Args {
if arg == clusterDomainFlag {
operator.Spec.Template.Spec.Containers[0].Args = append(
operator.Spec.Template.Spec.Containers[0].Args[:i], operator.Spec.Template.Spec.Containers[0].Args[i+1:]...)
break
}
}
if diff := cmp.Diff(oldOperator, operator); len(diff) != 0 {
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, operator, metav1.UpdateOptions{})
gomega.Expect(err).Should(gomega.Succeed())
gomega.Eventually(func() bool {
ok, err := ensureDeploymentAvailableReplicas(ctx, mpiOperator, mpiOperator)
gomega.Expect(err).Should(gomega.Succeed())
return ok
}, foreverTimeout, waitInterval).Should(gomega.BeTrue())
}
// Restore the previous MPIJob configurations.
mpiJob.Spec.RunLauncherAsWorker = ptr.To(false)
for i, arg := range mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].Command {
if arg == allowRunAsRootOpt {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].Command = append(
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].Command[:i],
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers[0].Command[i+1:]...)
}
}
})

ginkgo.When("running as root", func() {
ginkgo.It("should succeed", func() {
mpiJob := createJobAndWaitForCompletion(mpiJob)
expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded)
})
})
})
})

func resumeJob(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
Expand Down Expand Up @@ -761,7 +816,7 @@ func cleanUpVolcanoScheduler() {
}

// setupMPIOperator scales down and scales up the MPIOperator replication so that set up gang-scheduler takes effect
func setupMPIOperator(ctx context.Context, mpiJob *kubeflow.MPIJob, enableGangSchedulingFlag string, unschedulableResources *corev1.ResourceList) {
func setupMPIOperator(ctx context.Context, mpiJob *kubeflow.MPIJob, unschedulableResources *corev1.ResourceList, managerFlags ...string) {
ginkgo.By("Scale-In the deployment to 0")
operator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
Expand All @@ -778,7 +833,7 @@ func setupMPIOperator(ctx context.Context, mpiJob *kubeflow.MPIJob, enableGangSc
gomega.Eventually(func() error {
updatedOperator, err := k8sClient.AppsV1().Deployments(mpiOperator).Get(ctx, mpiOperator, metav1.GetOptions{})
gomega.Expect(err).Should(gomega.Succeed())
updatedOperator.Spec.Template.Spec.Containers[0].Args = append(updatedOperator.Spec.Template.Spec.Containers[0].Args, enableGangSchedulingFlag)
updatedOperator.Spec.Template.Spec.Containers[0].Args = append(updatedOperator.Spec.Template.Spec.Containers[0].Args, managerFlags...)
updatedOperator.Spec.Replicas = ptr.To[int32](1)
_, err = k8sClient.AppsV1().Deployments(mpiOperator).Update(ctx, updatedOperator, metav1.UpdateOptions{})
return err
Expand All @@ -791,5 +846,9 @@ func setupMPIOperator(ctx context.Context, mpiJob *kubeflow.MPIJob, enableGangSc
return isNotZero
}, foreverTimeout, waitInterval).Should(gomega.BeTrue())
createMPIJobWithOpenMPI(mpiJob)
mpiJob.Spec.RunPolicy.SchedulingPolicy = &kubeflow.SchedulingPolicy{MinResources: unschedulableResources}
if unschedulableResources != nil {
mpiJob.Spec.RunPolicy.SchedulingPolicy = &kubeflow.SchedulingPolicy{
MinResources: unschedulableResources,
}
}
}
Loading