diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index 2760714d7..43ca30ef2 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -27,6 +27,7 @@ class JobRunnerSrv @Inject() ( artifactModel: ArtifactModel, processJobRunnerSrv: ProcessJobRunnerSrv, dockerJobRunnerSrv: DockerJobRunnerSrv, + k8sJobRunnerSrv: K8sJobRunnerSrv, workerSrv: WorkerSrv, createSrv: CreateSrv, updateSrv: UpdateSrv, @@ -47,6 +48,7 @@ class JobRunnerSrv @Inject() ( .getOrElse(Seq("docker", "process")) .map(_.toLowerCase) .collect { + case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes" case "docker" if dockerJobRunnerSrv.isAvailable => "docker" case "process" => Seq("", "2", "3").foreach { pythonVersion => @@ -65,6 +67,7 @@ class JobRunnerSrv @Inject() ( lazy val processRunnerIsEnable: Boolean = runners.contains("process") lazy val dockerRunnerIsEnable: Boolean = runners.contains("docker") + lazy val k8sRunnerIsEnable: Boolean = runners.contains("kubernetes") private object deleteVisitor extends SimpleFileVisitor[Path] { override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { @@ -218,6 +221,14 @@ class JobRunnerSrv @Inject() ( maybeJobFolder = Some(jobFolder) runners .foldLeft[Option[Try[Unit]]](None) { + case (None, "kubernetes") => + worker + .dockerImage() + .map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with kubernetes (doesn't have image)") + None + } case (None, "docker") => worker .dockerImage() diff --git a/app/org/thp/cortex/services/K8SJobRunnerSrv.scala b/app/org/thp/cortex/services/K8SJobRunnerSrv.scala new file mode 100644 index 000000000..97622d88f --- /dev/null +++ b/app/org/thp/cortex/services/K8SJobRunnerSrv.scala @@ -0,0 +1,154 @@ +package org.thp.cortex.services + +import java.util.concurrent.TimeUnit +import java.nio.file._ + +import scala.concurrent.duration.FiniteDuration +import scala.util.{Try} +import scala.collection.JavaConverters._ + +import play.api.{Configuration, Logger} + +import akka.actor.ActorSystem +import io.fabric8.kubernetes.client.{DefaultKubernetesClient} +import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder} +import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder} +import javax.inject.{Inject, Singleton} +import org.thp.cortex.models._ + +@Singleton +class K8sJobRunnerSrv( + client: DefaultKubernetesClient, + config: Configuration, + autoUpdate: Boolean, + jobBaseDirectory: Path, + persistentVolumeClaimName: String, + implicit val system: ActorSystem +) { + + @Inject() + def this(config: Configuration, system: ActorSystem) = + this( + new DefaultKubernetesClient(), + config, + config.getOptional[Boolean]("job.kubernetes.autoUpdate").getOrElse(true), + Paths.get(config.get[String]("job.directory")), + config.get[String]("job.kubernetes.persistentVolumeClaimName"), + system: ActorSystem + ) + + lazy val logger = Logger(getClass) + + lazy val isAvailable: Boolean = + Try { + val ver = client.getVersion() + logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}") + true + }.recover { + case error => + logger.info(s"Kubernetes is not available", error) + false + }.get + + def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration]): Try[Unit] = { + val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") + val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString() + // make the default longer than likely values, but still not infinite + val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS) + // https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ + // FIXME: this collapses case, jeopardizing the uniqueness of the + // identifier. LDH: lowercase, digits, hyphens. + val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-") + val kjobName = "neuron-job-" + ldh_jobid + val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder() + .withClaimName(persistentVolumeClaimName) + .withReadOnly(false) + .build(); + val kjob1 = new KJobBuilder() + .withApiVersion("batch/v1") + .withNewMetadata() + .withName(kjobName) + .withLabels(Map( + "cortex-job-id" -> job.id, + "cortex-worker-id" -> job.workerId(), + "cortex-neuron-job" -> "true").asJava) + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewSpec() + .addNewVolume() + .withName("job-directory") + .withPersistentVolumeClaim(pvcvs) + .endVolume() + .addNewContainer() + .withName("neuron") + .withImage(dockerImage) + .withArgs("/job") + .addNewEnv() + .withName("CORTEX_JOB_FOLDER") + .withValue(relativeJobDirectory) + .endEnv(); + val kjob2 = if (Files.exists(cacertsFile)) { + kjob1.addNewEnv() + .withName("REQUESTS_CA_BUNDLE") + .withValue("/job/input/cacerts") + .endEnv() + } else { + kjob1 + } + val kjob3 = kjob2 + .addNewVolumeMount() + .withName("job-directory") + .withSubPathExpr("$(CORTEX_JOB_FOLDER)/input") + .withMountPath("/job/input") + .withReadOnly(true) + .endVolumeMount() + .addNewVolumeMount() + .withName("job-directory") + .withSubPathExpr("$(CORTEX_JOB_FOLDER)/output") + .withMountPath("/job/output") + .withReadOnly(false) + .endVolumeMount() + .endContainer() + .withRestartPolicy("Never") + .endSpec() + .endTemplate() + .endSpec() + .build(); + + val execution = Try { + val created_kjob = client.batch().jobs().create(kjob3) + val created_env = created_kjob + .getSpec().getTemplate().getSpec().getContainers().get(0) + .getEnv().asScala; + logger.info( + s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" + + s" timeout: ${timeout_or_default.toString}\n" + + s" image : $dockerImage\n" + + s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" + + created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString) + val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id) + .waitUntilCondition((x => Option(x).flatMap(j => + Option(j.getStatus).flatMap(s => + Some(s.getConditions.asScala.map(_.getType).filter(t => + t.equals("Complete") || t.equals("Failed")).nonEmpty))) + getOrElse false), + timeout_or_default.length, timeout_or_default.unit); + if(ended_kjob != null) { + logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " + + s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}") + } else { + logger.info(s"Kubernetes Job for ${job.id} no longer exists") + } + } + // let's find the job by the attribute we know is fundamentally + // unique, rather than one constructed from it + val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete() + if(deleted) { + logger.info(s"Deleted Kubernetes Job for job ${job.id}") + } else { + logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK") + } + execution + } +} diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 9de59b0bd..7db3d6940 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -175,10 +175,14 @@ class WorkerSrv @Inject() ( workerDefinitions.filter { case w if w.command.isDefined && jobRunnerSrv.processRunnerIsEnable => true case w if w.dockerImage.isDefined && jobRunnerSrv.dockerRunnerIsEnable => true + case w if w.dockerImage.isDefined && jobRunnerSrv.k8sRunnerIsEnable => true case w => val reason = if (w.command.isDefined) "process runner is disabled" - else if (w.dockerImage.isDefined) "Docker runner is disabled" + else if (w.dockerImage.isDefined && !jobRunnerSrv.dockerRunnerIsEnable) + "Docker runner is disabled" + else if (w.dockerImage.isDefined && !jobRunnerSrv.k8sRunnerIsEnable) + "Kubernetes runner is disabled" else "it doesn't have image nor command" logger.warn(s"$workerType ${w.name} is disabled because $reason") diff --git a/build.sbt b/build.sbt index 94b60e4ae..7086e1eaa 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,7 @@ lazy val cortex = (project in file(".")) Dependencies.reflections, Dependencies.zip4j, Dependencies.dockerClient, + Dependencies.k8sClient, Dependencies.akkaCluster, Dependencies.akkaClusterTyped ), diff --git a/conf/reference.conf b/conf/reference.conf index a96f0f1e1..d15484389 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -11,7 +11,7 @@ cache { job { timeout = 30 minutes - runners = [docker, process] + runners = [kubernetes, docker, process] directory = ${java.io.tmpdir} dockerDirectory = ${job.directory} keepJobFolder = false diff --git a/package/docker/entrypoint b/package/docker/entrypoint index 68241d11b..bf50926b7 100755 --- a/package/docker/entrypoint +++ b/package/docker/entrypoint @@ -20,6 +20,7 @@ SHOW_SECRET=${show_secret:-0} DAEMON_USER=${daemon_user:-cortex} JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs} DOCKER_JOB_DIRECTORY=${docker_job_directory:-} +KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-} function usage { cat <<- _EOF_ @@ -33,6 +34,7 @@ function usage { --show-secret | show the generated secret --job-directory | use this directory to store job files --docker-job-directory | indicate the job directory in the host (not inside container) + --kubernetes-job-pvc | indicate the ReadWriteMany persistent volume claim holding job directory --analyzer-url | where analyzers are located (url or path) --responder-url | where responders are located (url or path) --start-docker | start a internal docker (inside container) to run analyzers/responders @@ -56,6 +58,7 @@ do "--show-secret") SHOW_SECRET=1;; "--job-directory") shift; JOB_DIRECTORY=$1;; "--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;; + "--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;; "--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url" shift; ANALYZER_URLS+=("$1");; "--responder-path") echo "--responder-path is deprecated, please use --responder-url" @@ -112,6 +115,7 @@ then test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE" test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE" + test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE" function join_urls { echo -n "\"$1\"" diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7d07b526f..89715f599 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,6 +21,7 @@ object Dependencies { val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5" val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6" val dockerClient = "com.spotify" % "docker-client" % "8.16.0" + val k8sClient = "io.fabric8" % "kubernetes-client" % "5.0.2" val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion } diff --git a/www/src/app/pages/analyzers/analyzers.service.js b/www/src/app/pages/analyzers/analyzers.service.js index 64a3f2631..1dd732c3d 100644 --- a/www/src/app/pages/analyzers/analyzers.service.js +++ b/www/src/app/pages/analyzers/analyzers.service.js @@ -43,6 +43,7 @@ export default class AnalyzerService { if (def.dockerImage && def.dockerImage !== null) { def.runners.push('Docker'); + def.runners.push('Kubernetes'); } }); @@ -232,4 +233,4 @@ export default class AnalyzerService { return this.$http.post('./api/analyzer/' + id + '/run', postData); } } -} \ No newline at end of file +}