Skip to content

Commit 75564bd

Browse files
Ability to terminate jobs given a specific prefix
1 parent 51c37eb commit 75564bd

File tree

4 files changed

+116
-1
lines changed

4 files changed

+116
-1
lines changed

src/AWSBatch.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export BatchJob, ComputeEnvironment, BatchEnvironmentError, BatchJobError
1414
export JobQueue, JobDefinition, JobState, LogEvent
1515
export run_batch, describe, status, status_reason, wait, log_events, isregistered, register, deregister
1616
export list_job_queues, list_job_definitions, create_compute_environment, create_job_queue
17+
export terminate_jobs
1718

1819

1920
const logger = getlogger(@__MODULE__)
@@ -29,6 +30,7 @@ include("job_queue.jl")
2930
include("job_state.jl")
3031
include("job_definition.jl")
3132
include("batch_job.jl")
33+
include("utilities.jl")
3234

3335

3436
"""

src/utilities.jl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
_incomplete_job(job) = !(job["status"] in ["SUCCEEDED", "FAILED"])
2+
_suffix_asterisk(prefix) = endswith(prefix, "*") ? prefix : string(prefix, "*")
3+
4+
function _get_job_ids(job_queue, prefix)
5+
# Check if the prefix provided ends w/ '*", if not append it
6+
prefix = _suffix_asterisk(prefix)
7+
8+
resp = @mock Batch.list_jobs(
9+
Dict(
10+
"jobQueue" => job_queue,
11+
"filters" => [Dict("name" => "JOB_NAME", "values" => [prefix])],
12+
),
13+
)
14+
jobs = resp["jobSummaryList"]
15+
16+
while haskey(resp, "nextToken")
17+
resp = @mock Batch.list_jobs(
18+
Dict("jobQueue" => job_queue, "nextToken" => resp["nextToken"])
19+
)
20+
append!(jobs, resp["jobSummaryList"])
21+
end
22+
23+
filter!(j -> _incomplete_job(j), jobs)
24+
25+
return [j["jobId"] for j in jobs]
26+
end
27+
28+
"""
29+
terminate_jobs()
30+
31+
Terminate all Batch jobs with a given prefix.
32+
33+
# Arguments
34+
- `job_queue::JobQueue`: JobQueue where the jobs reside
35+
- `prefix::AbstractString`: Prefix for the jobs
36+
37+
# Keywords
38+
- `reason::AbstractString=""`: Reason to terminate the jobs
39+
40+
# Return
41+
- `Array{String}`: Terminated Job Ids
42+
"""
43+
function terminate_jobs(
44+
job_queue::AbstractString, prefix::AbstractString; reason::AbstractString=""
45+
)
46+
job_ids = _get_job_ids(job_queue, prefix)
47+
48+
for job_id in job_ids
49+
@mock Batch.terminate_job(job_id, reason)
50+
end
51+
52+
return job_ids
53+
end
54+
55+
function terminate_jobs(
56+
job_queue::JobQueue, prefix::AbstractString; reason::AbstractString=""
57+
)
58+
return terminate_jobs(job_queue.arn, prefix; reason=reason)
59+
end

test/runtests.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using AWS
2+
using AWS.AWSExceptions: AWSException
23
using AWSTools.CloudFormation: stack_output
34
using AWSBatch
45
using Dates
@@ -8,7 +9,7 @@ using Memento.TestUtils: @test_log, @test_nolog
89
using Mocking
910
using Test
1011

11-
using AWS.AWSExceptions: AWSException
12+
@service Batch
1213

1314
Mocking.activate()
1415

@@ -81,6 +82,7 @@ include("mock.jl")
8182
include("job_state.jl")
8283
include("batch_job.jl")
8384
include("run_batch.jl")
85+
include("utilities.jl")
8486
end
8587
else
8688
warn(logger, "Skipping \"local\" tests. Set `ENV[\"TESTS\"] = \"local\"` to run.")

test/utilities.jl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
list_jobs_patch = @patch function AWSBatch.Batch.list_jobs(params)
2+
response = Dict{String, Any}(
3+
"jobSummaryList" => [
4+
Dict(
5+
"jobId" => "1",
6+
"status" => "SUCCEEDED"
7+
),
8+
Dict(
9+
"jobId" => "2",
10+
"status" => "FAILED"
11+
),
12+
Dict(
13+
"jobId" => "3",
14+
"status" => "RUNNABLE"
15+
),
16+
]
17+
)
18+
19+
if !haskey(params, "nextToken")
20+
response["nextToken"] = "nextToken"
21+
end
22+
23+
return response
24+
end
25+
26+
terminate_job_patch = @patch AWSBatch.Batch.terminate_job(a...) = Dict()
27+
28+
@testset "_incomplete_job" begin
29+
@test_throws KeyError AWSBatch._incomplete_job(Dict())
30+
@test !AWSBatch._incomplete_job(Dict("status" => "SUCCEEDED"))
31+
@test !AWSBatch._incomplete_job(Dict("status" => "FAILED"))
32+
@test AWSBatch._incomplete_job(Dict("status" => "FOOBAR"))
33+
end
34+
35+
@testset "_suffix_asterisk" begin
36+
@test AWSBatch._suffix_asterisk("foobar") == "foobar*"
37+
@test AWSBatch._suffix_asterisk("foobar*") == "foobar*"
38+
end
39+
40+
@testset "_get_job_ids" begin
41+
apply(list_jobs_patch) do
42+
response = AWSBatch._get_job_ids("foo", "bar")
43+
@test response == ["3", "3"]
44+
end
45+
end
46+
47+
@testset "terminate_jobs" begin
48+
apply([list_jobs_patch, terminate_job_patch]) do
49+
response = terminate_jobs("foo", "bar")
50+
@test response == ["3", "3"]
51+
end
52+
end

0 commit comments

Comments
 (0)