Skip to content

Commit cbb9d77

Browse files
committed
chore(workload-api): make workload status transitions atomic (#16507)
1 parent 79ad476 commit cbb9d77

File tree

8 files changed

+549
-46
lines changed

8 files changed

+549
-46
lines changed

airbyte-commons-workload/src/main/kotlin/io/airbyte/workload/repository/WorkloadRepository.kt

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,29 @@ interface WorkloadRepository : PageableRepository<Workload, String> {
7373
createdBefore: OffsetDateTime?,
7474
): List<Workload>
7575

76+
/**
77+
* Cancel transitions a workload into a cancelled state if the workload was non-terminal.
78+
* Cancel returns the workload if the status was just updated to cancelled.
79+
*/
80+
@Query(
81+
"""
82+
UPDATE workload
83+
SET
84+
status = 'cancelled',
85+
termination_reason = :reason,
86+
termination_source = :source,
87+
deadline = null,
88+
updated_at = now()
89+
WHERE id = :id AND status in ('pending', 'claimed', 'launched', 'running')
90+
RETURNING *
91+
""",
92+
)
93+
fun cancel(
94+
@Id id: String,
95+
reason: String?,
96+
source: String?,
97+
): Workload?
98+
7699
/**
77100
* Claim transitions a workload into a claimed state and updates the deadline if the workload was pending.
78101
* Claim returns the workload if it is in a valid claimed status by the dataplane (either from this call or if it was already claimed).
@@ -98,6 +121,48 @@ interface WorkloadRepository : PageableRepository<Workload, String> {
98121
deadline: OffsetDateTime,
99122
): Workload?
100123

124+
/**
125+
* Fail transitions a workload into a cancelled state if the workload was non-terminal.
126+
* Fail returns the workload if the status was just updated to failure.
127+
*/
128+
@Query(
129+
"""
130+
UPDATE workload
131+
SET
132+
status = 'failure',
133+
termination_reason = :reason,
134+
termination_source = :source,
135+
deadline = null,
136+
updated_at = now()
137+
WHERE id = :id AND status in ('pending', 'claimed', 'launched', 'running')
138+
RETURNING *
139+
""",
140+
)
141+
fun fail(
142+
@Id id: String,
143+
reason: String?,
144+
source: String?,
145+
): Workload?
146+
147+
/**
148+
* Succeed transitions a workload into a cancelled state if the workload was non-terminal.
149+
* Succeed returns the workload if the status was just updated to success.
150+
*/
151+
@Query(
152+
"""
153+
UPDATE workload
154+
SET
155+
status = 'success',
156+
deadline = null,
157+
updated_at = now()
158+
WHERE id = :id AND status in ('pending', 'claimed', 'launched', 'running')
159+
RETURNING *
160+
""",
161+
)
162+
fun succeed(
163+
@Id id: String,
164+
): Workload?
165+
101166
fun update(
102167
@Id id: String,
103168
status: WorkloadStatus,

airbyte-commons-workload/src/main/kotlin/io/airbyte/workload/services/WorkloadService.kt

Lines changed: 128 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ package io.airbyte.workload.services
77
import io.airbyte.commons.enums.convertTo
88
import io.airbyte.config.WorkloadPriority
99
import io.airbyte.config.WorkloadType
10+
import io.airbyte.featureflag.Empty
11+
import io.airbyte.featureflag.FeatureFlagClient
12+
import io.airbyte.featureflag.UseAtomicWorkloadStateTransitions
1013
import io.airbyte.workload.common.DefaultDeadlineValues
1114
import io.airbyte.workload.repository.WorkloadQueueRepository
1215
import io.airbyte.workload.repository.WorkloadRepository
@@ -43,6 +46,7 @@ class WorkloadService(
4346
private val workloadQueueRepository: WorkloadQueueRepository,
4447
private val signalSender: SignalSender,
4548
private val defaultDeadlineValues: DefaultDeadlineValues,
49+
private val featureFlagClient: FeatureFlagClient,
4650
) {
4751
fun createWorkload(
4852
workloadId: String,
@@ -113,25 +117,49 @@ class WorkloadService(
113117
source: String?,
114118
reason: String?,
115119
) {
116-
val workload = getWorkload(workloadId)
117-
118-
when (workload.status) {
119-
WorkloadStatus.PENDING, WorkloadStatus.LAUNCHED, WorkloadStatus.CLAIMED, WorkloadStatus.RUNNING -> {
120-
workloadRepository.update(
121-
workloadId,
122-
WorkloadStatus.CANCELLED,
123-
source,
124-
reason,
125-
null,
126-
)
120+
if (featureFlagClient.boolVariation(UseAtomicWorkloadStateTransitions, Empty)) {
121+
val workload = workloadRepository.cancel(workloadId, reason = reason, source = source)
122+
if (workload != null) {
123+
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
127124
signalSender.sendSignal(workload.type, workload.signalInput)
125+
} else {
126+
workloadRepository
127+
.findById(workloadId)
128+
.map { w ->
129+
when (w.status) {
130+
WorkloadStatus.FAILURE, WorkloadStatus.SUCCESS -> throw InvalidStatusTransitionException(
131+
"Cannot cancel a workload in either success or failure status. Workload id: $workloadId has status: ${w.status}",
132+
)
133+
WorkloadStatus.CANCELLED ->
134+
logger.info {
135+
"Workload $workloadId is already cancelled. Cancelling an already cancelled workload is a noop"
136+
}
137+
else -> logger.error { "Cancelling workload $workloadId failed to update its status, status is ${w.status}" }
138+
}
139+
}
140+
}
141+
} else {
142+
val workload = getWorkload(workloadId)
128143

129-
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
144+
when (workload.status) {
145+
WorkloadStatus.PENDING, WorkloadStatus.LAUNCHED, WorkloadStatus.CLAIMED, WorkloadStatus.RUNNING -> {
146+
workloadRepository.update(
147+
workloadId,
148+
WorkloadStatus.CANCELLED,
149+
source,
150+
reason,
151+
null,
152+
)
153+
signalSender.sendSignal(workload.type, workload.signalInput)
154+
155+
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
156+
}
157+
158+
WorkloadStatus.CANCELLED -> logger.info { "Workload $workloadId is already cancelled. Cancelling an already cancelled workload is a noop" }
159+
else -> throw InvalidStatusTransitionException(
160+
"Cannot cancel a workload in either success or failure status. Workload id: $workloadId has status: ${workload.status}",
161+
)
130162
}
131-
WorkloadStatus.CANCELLED -> logger.info { "Workload $workloadId is already cancelled. Cancelling an already cancelled workload is a noop" }
132-
else -> throw InvalidStatusTransitionException(
133-
"Cannot cancel a workload in either success or failure status. Workload id: $workloadId has status: ${workload.status}",
134-
)
135163
}
136164
}
137165

@@ -140,25 +168,95 @@ class WorkloadService(
140168
source: String?,
141169
reason: String?,
142170
) {
143-
val workload = getWorkload(workloadId)
144-
when (workload.status) {
145-
WorkloadStatus.PENDING, WorkloadStatus.CLAIMED, WorkloadStatus.LAUNCHED, WorkloadStatus.RUNNING -> {
146-
workloadRepository.update(
147-
workloadId,
148-
WorkloadStatus.FAILURE,
149-
source,
150-
reason,
151-
null,
152-
)
171+
if (featureFlagClient.boolVariation(UseAtomicWorkloadStateTransitions, Empty)) {
172+
val workload = workloadRepository.fail(workloadId, reason = reason, source = source)
173+
if (workload != null) {
174+
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
153175
signalSender.sendSignal(workload.type, workload.signalInput)
176+
} else {
177+
workloadRepository
178+
.findById(workloadId)
179+
.map { w ->
180+
when (w.status) {
181+
WorkloadStatus.CANCELLED, WorkloadStatus.SUCCESS -> throw InvalidStatusTransitionException(
182+
"Cannot fail a workload in either canceled or success status. Workload id: $workloadId has status: ${w.status}",
183+
)
184+
WorkloadStatus.FAILURE ->
185+
logger.info {
186+
"Workload $workloadId is already failed. Failing an already failed workload is a noop"
187+
}
188+
else -> logger.error { "Failed workload $workloadId failed to update its status, status is ${w.status}" }
189+
}
190+
}
191+
}
192+
} else {
193+
val workload = getWorkload(workloadId)
194+
when (workload.status) {
195+
WorkloadStatus.PENDING, WorkloadStatus.CLAIMED, WorkloadStatus.LAUNCHED, WorkloadStatus.RUNNING -> {
196+
workloadRepository.update(
197+
workloadId,
198+
WorkloadStatus.FAILURE,
199+
source,
200+
reason,
201+
null,
202+
)
203+
signalSender.sendSignal(workload.type, workload.signalInput)
204+
205+
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
206+
}
154207

208+
WorkloadStatus.FAILURE -> logger.info { "Workload $workloadId is already marked as failed. Failing an already failed workload is a noop" }
209+
else -> throw InvalidStatusTransitionException(
210+
"Tried to fail a workload that is not active. Workload id: $workloadId has status: ${workload.status}",
211+
)
212+
}
213+
}
214+
}
215+
216+
fun succeedWorkload(workloadId: String) {
217+
if (featureFlagClient.boolVariation(UseAtomicWorkloadStateTransitions, Empty)) {
218+
val workload = workloadRepository.succeed(workloadId)
219+
if (workload != null) {
155220
workloadQueueRepository.ackWorkloadQueueItem(workloadId)
221+
signalSender.sendSignal(workload.type, workload.signalInput)
222+
} else {
223+
workloadRepository
224+
.findById(workloadId)
225+
.map { w ->
226+
when (w.status) {
227+
WorkloadStatus.CANCELLED, WorkloadStatus.FAILURE -> throw InvalidStatusTransitionException(
228+
"Cannot fail a workload in either canceled or failure status. Workload id: $workloadId has status: ${w.status}",
229+
)
230+
231+
WorkloadStatus.SUCCESS ->
232+
logger.info {
233+
"Workload $workloadId is already successful. Succeeding an already successful workload is a noop"
234+
}
235+
236+
else -> logger.error { "Failed workload $workloadId failed to update its status, status is ${w.status}" }
237+
}
238+
}
156239
}
240+
} else {
241+
val workload = getWorkload(workloadId)
157242

158-
WorkloadStatus.FAILURE -> logger.info { "Workload $workloadId is already marked as failed. Failing an already failed workload is a noop" }
159-
else -> throw InvalidStatusTransitionException(
160-
"Tried to fail a workload that is not active. Workload id: $workloadId has status: ${workload.status}",
161-
)
243+
when (workload.status) {
244+
WorkloadStatus.CLAIMED, WorkloadStatus.LAUNCHED, WorkloadStatus.RUNNING -> {
245+
workloadRepository.update(
246+
workloadId,
247+
WorkloadStatus.SUCCESS,
248+
null,
249+
)
250+
signalSender.sendSignal(workload.type, workload.signalInput)
251+
}
252+
253+
WorkloadStatus.SUCCESS ->
254+
logger.info { "Workload $workloadId is already marked as succeeded. Succeeding an already succeeded workload is a noop" }
255+
256+
else -> throw InvalidStatusTransitionException(
257+
"Tried to succeed a workload that is not active. Workload id: $workloadId has status: ${workload.status}",
258+
)
259+
}
162260
}
163261
}
164262

0 commit comments

Comments
 (0)