Skip to content

Commit e703050

Browse files
committed
revert: "fix: detect source container OOM by monitoring pipe closure" (#18388)
1 parent 5ab831e commit e703050

File tree

4 files changed

+21
-272
lines changed

4 files changed

+21
-272
lines changed

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteDestination.kt

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,11 @@ class LocalContainerAirbyteDestination(
3535
private val containerIOHandle: ContainerIOHandle,
3636
private val containerLogMdcBuilder: MdcScope.Builder,
3737
private val flushImmediately: Boolean = false,
38-
private val exitCodeWaitSeconds: Long = EXIT_CODE_WAIT_SECONDS,
3938
) : AirbyteDestination {
40-
companion object {
41-
// Fallback exit code when the destination container is killed (e.g., OOM) and no exit code file is written
42-
const val FALLBACK_EXIT_CODE = 1
43-
44-
// Time to wait for exit code file after pipe closes (to handle race condition between pipe close and exit code write)
45-
const val EXIT_CODE_WAIT_SECONDS = 10L
46-
}
47-
4839
private val inputHasEnded = AtomicBoolean(false)
4940
private lateinit var messageIterator: Iterator<AirbyteMessage>
5041
private lateinit var writer: AirbyteMessageBufferedWriter<AirbyteMessage>
5142

52-
// Tracks when the output stream iterator is exhausted (pipes closed).
53-
// This is important for detecting OOM kills where the container dies without writing an exit code file.
54-
@Volatile
55-
private var outputStreamExhausted = false
56-
5743
override fun close() {
5844
emitDestinationMessageCountMetrics()
5945

@@ -118,55 +104,14 @@ class LocalContainerAirbyteDestination(
118104
}
119105

120106
override val isFinished: Boolean
121-
/**
107+
/*
122108
* As this check is done on every message read, it is important for this operation to be efficient.
123109
* Short circuit early to avoid checking the underlying process. Note: hasNext is blocking.
124-
*
125-
* We consider the destination finished in one case:
126-
* 1. The iterator has no more messages
127110
*/
128-
get() {
129-
if (outputStreamExhausted) {
130-
return true
131-
}
132-
val hasNext = messageIterator.hasNext()
133-
if (!hasNext) {
134-
outputStreamExhausted = true
135-
return true
136-
}
137-
return false
138-
}
111+
get() = !messageIterator.hasNext() && containerIOHandle.exitCodeExists()
139112

140113
override val exitValue: Int
141-
/**
142-
* Returns the exit code of the destination container.
143-
* If no exit code file exists but the output stream was exhausted (pipes closed),
144-
* this waits briefly for the exit code file to appear (to handle the race condition
145-
* between pipe close and exit code file write), then returns a fallback exit code
146-
* if the file still doesn't exist.
147-
*/
148-
get() {
149-
if (containerIOHandle.exitCodeExists()) {
150-
return containerIOHandle.getExitCode()
151-
}
152-
if (outputStreamExhausted) {
153-
// Wait for exit code file - there's a race condition where the pipe closes
154-
// before the shell script writes the exit code file
155-
try {
156-
Thread.sleep(exitCodeWaitSeconds * 1000)
157-
} catch (e: InterruptedException) {
158-
Thread.currentThread().interrupt()
159-
}
160-
if (containerIOHandle.exitCodeExists()) {
161-
return containerIOHandle.getExitCode()
162-
}
163-
logger.error {
164-
"No exit code file found after waiting ${exitCodeWaitSeconds}s. Container may have been killed (OOM or other signal). Returning fallback exit code $FALLBACK_EXIT_CODE."
165-
}
166-
return FALLBACK_EXIT_CODE
167-
}
168-
return containerIOHandle.getExitCode() // This will throw IllegalStateException as expected
169-
}
114+
get() = containerIOHandle.getExitCode()
170115

171116
override fun attemptRead(): Optional<AirbyteMessage> {
172117
val m = if (messageIterator.hasNext()) messageIterator.next() else null

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteSource.kt

Lines changed: 4 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,9 @@ class LocalContainerAirbyteSource(
2929
private val messageMetricsTracker: MessageMetricsTracker,
3030
private val containerIOHandle: ContainerIOHandle,
3131
private val containerLogMdcBuilder: MdcScope.Builder,
32-
private val exitCodeWaitSeconds: Long = EXIT_CODE_WAIT_SECONDS,
3332
) : AirbyteSource {
34-
companion object {
35-
// Fallback exit code when the source container is killed (e.g., OOM) and no exit code file is written
36-
const val FALLBACK_EXIT_CODE = 1
37-
38-
// Time to wait for exit code file after pipe closes (to handle race condition between pipe close and exit code write)
39-
const val EXIT_CODE_WAIT_SECONDS = 10L
40-
}
41-
4233
private lateinit var messageIterator: Iterator<AirbyteMessage>
4334

44-
// Tracks when the output stream iterator is exhausted (pipes closed).
45-
// This is important for detecting OOM kills where the container dies without writing an exit code file.
46-
@Volatile
47-
private var outputStreamExhausted = false
48-
4935
override fun close() {
5036
messageMetricsTracker.flushSourceReadCountMetric()
5137
val terminationResult = containerIOHandle.terminate()
@@ -86,55 +72,14 @@ class LocalContainerAirbyteSource(
8672
}
8773

8874
override val isFinished: Boolean
89-
/**
75+
/*
9076
* As this check is done on every message read, it is important for this operation to be efficient.
91-
* Short circuit early to avoid checking the underlying process. Note: hasNext is blocking.
92-
*
93-
* We consider the source finished in one case:
94-
* 1. The iterator has no more messages
77+
* Short circuit early to avoid checking the underlying process. note: hasNext is blocking.
9578
*/
96-
get() {
97-
if (outputStreamExhausted) {
98-
return true
99-
}
100-
val hasNext = messageIterator.hasNext()
101-
if (!hasNext) {
102-
outputStreamExhausted = true
103-
return true
104-
}
105-
return false
106-
}
79+
get() = !messageIterator.hasNext() && containerIOHandle.exitCodeExists()
10780

10881
override val exitValue: Int
109-
/**
110-
* Returns the exit code of the source container.
111-
* If no exit code file exists but the output stream was exhausted (pipes closed),
112-
* this waits briefly for the exit code file to appear (to handle the race condition
113-
* between pipe close and exit code file write), then returns a fallback exit code
114-
* if the file still doesn't exist.
115-
*/
116-
get() {
117-
if (containerIOHandle.exitCodeExists()) {
118-
return containerIOHandle.getExitCode()
119-
}
120-
if (outputStreamExhausted) {
121-
// Wait for exit code file - there's a race condition where the pipe closes
122-
// before the shell script writes the exit code file
123-
try {
124-
Thread.sleep(exitCodeWaitSeconds * 1000)
125-
} catch (e: InterruptedException) {
126-
Thread.currentThread().interrupt()
127-
}
128-
if (containerIOHandle.exitCodeExists()) {
129-
return containerIOHandle.getExitCode()
130-
}
131-
logger.error {
132-
"No exit code file found after waiting ${exitCodeWaitSeconds}s. Container may have been killed (OOM or other signal). Returning fallback exit code $FALLBACK_EXIT_CODE."
133-
}
134-
return FALLBACK_EXIT_CODE
135-
}
136-
return containerIOHandle.getExitCode() // This will throw IllegalStateException as expected
137-
}
82+
get() = containerIOHandle.getExitCode()
13883

13984
override fun attemptRead(): Optional<AirbyteMessage> {
14085
val m = if (messageIterator.hasNext()) messageIterator.next() else null

airbyte-container-orchestrator/src/test/kotlin/io/airbyte/container/orchestrator/worker/io/LocalContainerAirbyteDestinationTest.kt

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,7 @@ internal class LocalContainerAirbyteDestinationTest {
213213
}
214214

215215
@Test
216-
internal fun testDestinationIsFinishedWithExitCode() {
217-
// Test normal finish with exit code file present
216+
internal fun testDestinationIsFinished() {
218217
val iterator =
219218
mockk<Iterator<AirbyteMessage>> {
220219
every { hasNext() } returns false
@@ -236,31 +235,15 @@ internal class LocalContainerAirbyteDestinationTest {
236235
jobRoot = jobRoot,
237236
)
238237
assertEquals(true, destination.isFinished)
239-
}
240238

241-
@Test
242-
internal fun testDestinationIsNotFinishedWhileHasMessages() {
243-
// Test that destination is not finished while there are still messages
244-
val iterator =
245-
mockk<Iterator<AirbyteMessage>> {
246-
every { hasNext() } returns true
247-
}
248-
every { stream.iterator() } returns iterator
239+
every { iterator.hasNext() } returns true
240+
assertEquals(false, destination.isFinished)
249241

250-
val destination =
251-
LocalContainerAirbyteDestination(
252-
streamFactory = streamFactory,
253-
messageMetricsTracker = messageMetricsTracker,
254-
messageWriterFactory = messageWriterFactory,
255-
containerIOHandle = containerIOHandle,
256-
containerLogMdcBuilder = containerLogMdcBuilder,
257-
destinationTimeoutMonitor = destinationTimeoutMonitor,
258-
)
242+
every { iterator.hasNext() } returns false
243+
exitValueFile.delete()
244+
assertEquals(false, destination.isFinished)
259245

260-
destination.start(
261-
destinationConfig = workerDestinationConfig,
262-
jobRoot = jobRoot,
263-
)
246+
every { iterator.hasNext() } returns true
264247
assertEquals(false, destination.isFinished)
265248
}
266249

@@ -321,7 +304,6 @@ internal class LocalContainerAirbyteDestinationTest {
321304
val mockedContainerIOHandle =
322305
mockk<ContainerIOHandle> {
323306
every { terminate() } returns true
324-
every { exitCodeExists() } returns true
325307
every { getExitCode() } returns exitValue
326308
}
327309

@@ -351,7 +333,6 @@ internal class LocalContainerAirbyteDestinationTest {
351333
mockk<ContainerIOHandle> {
352334
every { getErrInputStream() } returns mockk<InputStream>()
353335
every { terminate() } returns true
354-
every { exitCodeExists() } returns true
355336
every { getExitCode() } returns exitValue
356337
every { getInputStream() } returns mockk<InputStream>()
357338
every { getOutputStream() } returns mockk<OutputStream>()
@@ -390,7 +371,6 @@ internal class LocalContainerAirbyteDestinationTest {
390371
val mockedContainerIOHandle =
391372
mockk<ContainerIOHandle> {
392373
every { terminate() } returns true
393-
every { exitCodeExists() } returns true
394374
every { getExitCode() } returns exitValue
395375
}
396376
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit
@@ -419,7 +399,6 @@ internal class LocalContainerAirbyteDestinationTest {
419399
val mockedContainerIOHandle =
420400
mockk<ContainerIOHandle> {
421401
every { terminate() } returns false
422-
every { exitCodeExists() } returns true
423402
every { getExitCode() } returns exitValue
424403
}
425404
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit
@@ -448,7 +427,6 @@ internal class LocalContainerAirbyteDestinationTest {
448427
val mockedContainerIOHandle =
449428
mockk<ContainerIOHandle> {
450429
every { terminate() } returns false
451-
every { exitCodeExists() } returns true
452430
every { getExitCode() } returns exitValue
453431
}
454432
every { messageMetricsTracker.flushDestReadCountMetric() } returns Unit

0 commit comments

Comments
 (0)