Skip to content

Commit 7c74e02

Browse files
benmoriceauclaude
andcommitted
fix: detect source container OOM by monitoring pipe closure (#18381)
Co-authored-by: Claude <[email protected]>
1 parent 140e64e commit 7c74e02

File tree

4 files changed

+272
-21
lines changed

4 files changed

+272
-21
lines changed

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteDestination.kt

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,25 @@ class LocalContainerAirbyteDestination(
3535
private val containerIOHandle: ContainerIOHandle,
3636
private val containerLogMdcBuilder: MdcScope.Builder,
3737
private val flushImmediately: Boolean = false,
38+
private val exitCodeWaitSeconds: Long = EXIT_CODE_WAIT_SECONDS,
3839
) : AirbyteDestination {
40+
companion object {
41+
// Fallback exit code when the destination container is killed (e.g., OOM) and no exit code file is written
42+
const val FALLBACK_EXIT_CODE = 1
43+
44+
// Time to wait for exit code file after pipe closes (to handle race condition between pipe close and exit code write)
45+
const val EXIT_CODE_WAIT_SECONDS = 10L
46+
}
47+
3948
private val inputHasEnded = AtomicBoolean(false)
4049
private lateinit var messageIterator: Iterator<AirbyteMessage>
4150
private lateinit var writer: AirbyteMessageBufferedWriter<AirbyteMessage>
4251

52+
// Tracks when the output stream iterator is exhausted (pipes closed).
53+
// This is important for detecting OOM kills where the container dies without writing an exit code file.
54+
@Volatile
55+
private var outputStreamExhausted = false
56+
4357
override fun close() {
4458
emitDestinationMessageCountMetrics()
4559

@@ -104,14 +118,55 @@ class LocalContainerAirbyteDestination(
104118
}
105119

106120
override val isFinished: Boolean
107-
/*
121+
/**
108122
* As this check is done on every message read, it is important for this operation to be efficient.
109123
* Short circuit early to avoid checking the underlying process. Note: hasNext is blocking.
124+
*
125+
* We consider the destination finished in one case:
126+
* 1. The iterator has no more messages
110127
*/
111-
get() = !messageIterator.hasNext() && containerIOHandle.exitCodeExists()
128+
get() {
129+
if (outputStreamExhausted) {
130+
return true
131+
}
132+
val hasNext = messageIterator.hasNext()
133+
if (!hasNext) {
134+
outputStreamExhausted = true
135+
return true
136+
}
137+
return false
138+
}
112139

113140
override val exitValue: Int
114-
get() = containerIOHandle.getExitCode()
141+
/**
142+
* Returns the exit code of the destination container.
143+
* If no exit code file exists but the output stream was exhausted (pipes closed),
144+
* this waits briefly for the exit code file to appear (to handle the race condition
145+
* between pipe close and exit code file write), then returns a fallback exit code
146+
* if the file still doesn't exist.
147+
*/
148+
get() {
149+
if (containerIOHandle.exitCodeExists()) {
150+
return containerIOHandle.getExitCode()
151+
}
152+
if (outputStreamExhausted) {
153+
// Wait for exit code file - there's a race condition where the pipe closes
154+
// before the shell script writes the exit code file
155+
try {
156+
Thread.sleep(exitCodeWaitSeconds * 1000)
157+
} catch (e: InterruptedException) {
158+
Thread.currentThread().interrupt()
159+
}
160+
if (containerIOHandle.exitCodeExists()) {
161+
return containerIOHandle.getExitCode()
162+
}
163+
logger.error {
164+
"No exit code file found after waiting ${exitCodeWaitSeconds}s. Container may have been killed (OOM or other signal). Returning fallback exit code $FALLBACK_EXIT_CODE."
165+
}
166+
return FALLBACK_EXIT_CODE
167+
}
168+
return containerIOHandle.getExitCode() // This will throw IllegalStateException as expected
169+
}
115170

116171
override fun attemptRead(): Optional<AirbyteMessage> {
117172
val m = if (messageIterator.hasNext()) messageIterator.next() else null

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteSource.kt

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,23 @@ class LocalContainerAirbyteSource(
2929
private val messageMetricsTracker: MessageMetricsTracker,
3030
private val containerIOHandle: ContainerIOHandle,
3131
private val containerLogMdcBuilder: MdcScope.Builder,
32+
private val exitCodeWaitSeconds: Long = EXIT_CODE_WAIT_SECONDS,
3233
) : AirbyteSource {
34+
companion object {
35+
// Fallback exit code when the source container is killed (e.g., OOM) and no exit code file is written
36+
const val FALLBACK_EXIT_CODE = 1
37+
38+
// Time to wait for exit code file after pipe closes (to handle race condition between pipe close and exit code write)
39+
const val EXIT_CODE_WAIT_SECONDS = 10L
40+
}
41+
3342
private lateinit var messageIterator: Iterator<AirbyteMessage>
3443

44+
// Tracks when the output stream iterator is exhausted (pipes closed).
45+
// This is important for detecting OOM kills where the container dies without writing an exit code file.
46+
@Volatile
47+
private var outputStreamExhausted = false
48+
3549
override fun close() {
3650
messageMetricsTracker.flushSourceReadCountMetric()
3751
val terminationResult = containerIOHandle.terminate()
@@ -72,14 +86,55 @@ class LocalContainerAirbyteSource(
7286
}
7387

7488
override val isFinished: Boolean
75-
/*
89+
/**
7690
* As this check is done on every message read, it is important for this operation to be efficient.
77-
* Short circuit early to avoid checking the underlying process. note: hasNext is blocking.
91+
* Short circuit early to avoid checking the underlying process. Note: hasNext is blocking.
92+
*
93+
* We consider the source finished in one case:
94+
* 1. The iterator has no more messages
7895
*/
79-
get() = !messageIterator.hasNext() && containerIOHandle.exitCodeExists()
96+
get() {
97+
if (outputStreamExhausted) {
98+
return true
99+
}
100+
val hasNext = messageIterator.hasNext()
101+
if (!hasNext) {
102+
outputStreamExhausted = true
103+
return true
104+
}
105+
return false
106+
}
80107

81108
override val exitValue: Int
82-
get() = containerIOHandle.getExitCode()
109+
/**
110+
* Returns the exit code of the source container.
111+
* If no exit code file exists but the output stream was exhausted (pipes closed),
112+
* this waits briefly for the exit code file to appear (to handle the race condition
113+
* between pipe close and exit code file write), then returns a fallback exit code
114+
* if the file still doesn't exist.
115+
*/
116+
get() {
117+
if (containerIOHandle.exitCodeExists()) {
118+
return containerIOHandle.getExitCode()
119+
}
120+
if (outputStreamExhausted) {
121+
// Wait for exit code file - there's a race condition where the pipe closes
122+
// before the shell script writes the exit code file
123+
try {
124+
Thread.sleep(exitCodeWaitSeconds * 1000)
125+
} catch (e: InterruptedException) {
126+
Thread.currentThread().interrupt()
127+
}
128+
if (containerIOHandle.exitCodeExists()) {
129+
return containerIOHandle.getExitCode()
130+
}
131+
logger.error {
132+
"No exit code file found after waiting ${exitCodeWaitSeconds}s. Container may have been killed (OOM or other signal). Returning fallback exit code $FALLBACK_EXIT_CODE."
133+
}
134+
return FALLBACK_EXIT_CODE
135+
}
136+
return containerIOHandle.getExitCode() // This will throw IllegalStateException as expected
137+
}
83138

84139
override fun attemptRead(): Optional<AirbyteMessage> {
85140
val m = if (messageIterator.hasNext()) messageIterator.next() else null

airbyte-container-orchestrator/src/test/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteDestinationTest.kt

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ internal class LocalContainerAirbyteDestinationTest {
213213
}
214214

215215
@Test
216-
internal fun testDestinationIsFinished() {
216+
internal fun testDestinationIsFinishedWithExitCode() {
217+
// Test normal finish with exit code file present
217218
val iterator =
218219
mockk<Iterator<AirbyteMessage>> {
219220
every { hasNext() } returns false
@@ -235,15 +236,31 @@ internal class LocalContainerAirbyteDestinationTest {
235236
jobRoot = jobRoot,
236237
)
237238
assertEquals(true, destination.isFinished)
239+
}
238240

239-
every { iterator.hasNext() } returns true
240-
assertEquals(false, destination.isFinished)
241+
@Test
242+
internal fun testDestinationIsNotFinishedWhileHasMessages() {
243+
// Test that destination is not finished while there are still messages
244+
val iterator =
245+
mockk<Iterator<AirbyteMessage>> {
246+
every { hasNext() } returns true
247+
}
248+
every { stream.iterator() } returns iterator
241249

242-
every { iterator.hasNext() } returns false
243-
exitValueFile.delete()
244-
assertEquals(false, destination.isFinished)
250+
val destination =
251+
LocalContainerAirbyteDestination(
252+
streamFactory = streamFactory,
253+
messageMetricsTracker = messageMetricsTracker,
254+
messageWriterFactory = messageWriterFactory,
255+
containerIOHandle = containerIOHandle,
256+
containerLogMdcBuilder = containerLogMdcBuilder,
257+
destinationTimeoutMonitor = destinationTimeoutMonitor,
258+
)
245259

246-
every { iterator.hasNext() } returns true
260+
destination.start(
261+
destinationConfig = workerDestinationConfig,
262+
jobRoot = jobRoot,
263+
)
247264
assertEquals(false, destination.isFinished)
248265
}
249266

@@ -304,6 +321,7 @@ internal class LocalContainerAirbyteDestinationTest {
304321
val mockedContainerIOHandle =
305322
mockk<ContainerIOHandle> {
306323
every { terminate() } returns true
324+
every { exitCodeExists() } returns true
307325
every { getExitCode() } returns exitValue
308326
}
309327

@@ -333,6 +351,7 @@ internal class LocalContainerAirbyteDestinationTest {
333351
mockk<ContainerIOHandle> {
334352
every { getErrInputStream() } returns mockk<InputStream>()
335353
every { terminate() } returns true
354+
every { exitCodeExists() } returns true
336355
every { getExitCode() } returns exitValue
337356
every { getInputStream() } returns mockk<InputStream>()
338357
every { getOutputStream() } returns mockk<OutputStream>()
@@ -371,6 +390,7 @@ internal class LocalContainerAirbyteDestinationTest {
371390
val mockedContainerIOHandle =
372391
mockk<ContainerIOHandle> {
373392
every { terminate() } returns true
393+
every { exitCodeExists() } returns true
374394
every { getExitCode() } returns exitValue
375395
}
376396
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit
@@ -399,6 +419,7 @@ internal class LocalContainerAirbyteDestinationTest {
399419
val mockedContainerIOHandle =
400420
mockk<ContainerIOHandle> {
401421
every { terminate() } returns false
422+
every { exitCodeExists() } returns true
402423
every { getExitCode() } returns exitValue
403424
}
404425
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit
@@ -427,6 +448,7 @@ internal class LocalContainerAirbyteDestinationTest {
427448
val mockedContainerIOHandle =
428449
mockk<ContainerIOHandle> {
429450
every { terminate() } returns false
451+
every { exitCodeExists() } returns true
430452
every { getExitCode() } returns exitValue
431453
}
432454
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit

0 commit comments

Comments
 (0)