Skip to content

Commit 1c29927

Browse files
tryangulsubodh1810
andauthored
Rbroughan/fix proto input stream hang (#66576)
Co-authored-by: Subodh Kant Chaturvedi <[email protected]>
1 parent 0db743d commit 1c29927

17 files changed

+404
-98
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.1.42
2+
3+
Datflow Load CDK: Fixes hang when one of many parallel pipelines fails. Organizes thread pools.
4+
15
## Version 0.1.41
26

37
**Extract CDK**
@@ -12,6 +16,8 @@ Add gradle task to bump CDK version + add changelog entry
1216

1317
Minor fixes with stream completion logic + proto conversion in Load CDK.
1418

19+
* **Changed:** Minor fixes with stream completion logic + proto conversion
20+
1521
## Version 0.1.38
1622

1723
Adds stats support for "speed" mode to the Load CDK

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/config/ConnectorInputStreams.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ import java.io.InputStream
1111
* subject to Micronaut merging like beans into a single list leading to injecting unexpected extra
1212
* input streams.
1313
*/
14-
class ConnectorInputStreams(private val values: List<InputStream>) : List<InputStream> by values
14+
class ConnectorInputStreams(private val values: List<InputStream>) : List<InputStream> by values {
15+
fun closeAll() = values.forEach { it.close() }
16+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.dataflow.config
6+
7+
import io.micronaut.context.annotation.Factory
8+
import jakarta.inject.Named
9+
import jakarta.inject.Singleton
10+
import java.util.concurrent.Executors
11+
import kotlinx.coroutines.CoroutineDispatcher
12+
import kotlinx.coroutines.CoroutineScope
13+
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.asCoroutineDispatcher
15+
16+
/** The dispatchers (think views of thread pools) and static scopes we use for dataflow. */
17+
@Factory
18+
class DispatcherBeanFactory {
19+
@Named("pipelineRunnerDispatcher")
20+
@Singleton
21+
fun pipelineRunnerDispatcher() = Dispatchers.Default
22+
23+
@Named("stateReconcilerDispatcher") @Singleton fun stateReconcilerDispatcher() = Dispatchers.IO
24+
25+
@Named("aggregationDispatcher")
26+
@Singleton
27+
fun aggregationDispatcher(
28+
@Named("inputStreams") inputStreams: ConnectorInputStreams,
29+
) = Executors.newFixedThreadPool(inputStreams.size).asCoroutineDispatcher()
30+
31+
@Named("flushDispatcher") @Singleton fun flushDispatcher() = Dispatchers.IO
32+
33+
@Named("pipelineRunnerScope")
34+
@Singleton
35+
fun pipelineRunnerDispatcher(
36+
@Named("pipelineRunnerDispatcher") dispatcher: CoroutineDispatcher,
37+
) = CoroutineScope(dispatcher)
38+
39+
@Named("stateReconcilerScope")
40+
@Singleton
41+
fun stateDispatcher(
42+
@Named("stateReconcilerDispatcher") dispatcher: CoroutineDispatcher,
43+
) = CoroutineScope(dispatcher)
44+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/config/InputBeanFactory.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import io.micronaut.context.annotation.Requires
3030
import io.micronaut.context.annotation.Value
3131
import jakarta.inject.Named
3232
import jakarta.inject.Singleton
33+
import kotlinx.coroutines.CoroutineDispatcher
3334
import kotlinx.coroutines.flow.Flow
3435

3536
/**
@@ -134,6 +135,8 @@ class InputBeanFactory {
134135
stateHistogramStore: StateHistogramStore,
135136
statsStore: CommittedStatsStore,
136137
memoryAndParallelismConfig: MemoryAndParallelismConfig,
138+
@Named("aggregationDispatcher") aggregationDispatcher: CoroutineDispatcher,
139+
@Named("flushDispatcher") flushDispatcher: CoroutineDispatcher,
137140
): List<DataFlowPipeline> =
138141
inputFlows.map {
139142
val aggStore = aggregateStoreFactory.make()
@@ -153,6 +156,8 @@ class InputBeanFactory {
153156
state = state,
154157
completionHandler = completionHandler,
155158
memoryAndParallelismConfig = memoryAndParallelismConfig,
159+
aggregationDispatcher = aggregationDispatcher,
160+
flushDispatcher = flushDispatcher,
156161
)
157162
}
158163
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/input/ProtobufDestinationMessageInputFlow.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import kotlinx.coroutines.flow.Flow
1313
import kotlinx.coroutines.flow.FlowCollector
1414
import kotlinx.coroutines.withContext
1515

16+
/**
17+
* Performs non-cooperative blocking IO. Does not respond directly to coroutine
18+
* CancellationExceptions.
19+
*/
1620
class ProtobufDestinationMessageInputFlow(
1721
private val inputStream: InputStream,
1822
private val reader: ProtobufDataChannelReader,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/pipeline/DataFlowPipeline.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ package io.airbyte.cdk.load.dataflow.pipeline
66

77
import io.airbyte.cdk.load.dataflow.config.MemoryAndParallelismConfig
88
import io.airbyte.cdk.load.dataflow.stages.AggregateStage
9-
import java.util.concurrent.Executors
10-
import kotlinx.coroutines.Dispatchers
11-
import kotlinx.coroutines.asCoroutineDispatcher
9+
import kotlinx.coroutines.CoroutineDispatcher
1210
import kotlinx.coroutines.flow.Flow
1311
import kotlinx.coroutines.flow.buffer
1412
import kotlinx.coroutines.flow.flowOn
@@ -24,19 +22,19 @@ class DataFlowPipeline(
2422
private val state: DataFlowStage,
2523
private val completionHandler: PipelineCompletionHandler,
2624
private val memoryAndParallelismConfig: MemoryAndParallelismConfig,
25+
private val aggregationDispatcher: CoroutineDispatcher,
26+
private val flushDispatcher: CoroutineDispatcher,
2727
) {
28-
private val customDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
29-
3028
suspend fun run() {
3129
input
3230
.map(parse::apply)
3331
.transform { aggregate.apply(it, this) }
3432
.buffer(capacity = memoryAndParallelismConfig.maxBufferedAggregates)
35-
.flowOn(customDispatcher)
33+
.flowOn(aggregationDispatcher)
3634
.map(flush::apply)
3735
.map(state::apply)
3836
.onCompletion { completionHandler.apply(it) }
39-
.flowOn(Dispatchers.IO)
37+
.flowOn(flushDispatcher)
4038
.collect {}
4139
}
4240
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/pipeline/PipelineCompletionHandler.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ class PipelineCompletionHandler(
2323
cause: Throwable?,
2424
) = coroutineScope {
2525
if (cause != null) {
26-
log.error { "Destination Pipeline Completed — Exceptionally" }
2726
throw cause
2827
}
2928

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/pipeline/PipelineRunner.kt

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,48 +4,84 @@
44

55
package io.airbyte.cdk.load.dataflow.pipeline
66

7+
import io.airbyte.cdk.load.dataflow.config.ConnectorInputStreams
78
import io.airbyte.cdk.load.dataflow.state.StateReconciler
89
import io.airbyte.cdk.load.dataflow.state.StateStore
910
import io.github.oshai.kotlinlogging.KotlinLogging
11+
import jakarta.inject.Named
1012
import jakarta.inject.Singleton
1113
import java.lang.IllegalStateException
14+
import kotlinx.coroutines.CoroutineDispatcher
15+
import kotlinx.coroutines.CoroutineExceptionHandler
1216
import kotlinx.coroutines.CoroutineScope
13-
import kotlinx.coroutines.Dispatchers
14-
import kotlinx.coroutines.coroutineScope
17+
import kotlinx.coroutines.ExecutorCoroutineDispatcher
18+
import kotlinx.coroutines.cancel
1519
import kotlinx.coroutines.joinAll
1620
import kotlinx.coroutines.launch
1721

22+
/**
23+
* Orchestrates the running of pipelines in parallel, handles and propagates errors and manages the
24+
* state reconciler lifecycle.
25+
*/
1826
@Singleton
1927
class PipelineRunner(
2028
private val reconciler: StateReconciler,
2129
private val store: StateStore,
22-
val pipelines: List<DataFlowPipeline>,
30+
private val pipelines: List<DataFlowPipeline>,
31+
private val inputStreams: ConnectorInputStreams,
32+
@Named("pipelineRunnerScope") private val pipelineScope: CoroutineScope,
33+
@Named("aggregationDispatcher") private val aggregationDispatcher: CoroutineDispatcher,
2334
) {
2435
private val log = KotlinLogging.logger {}
2536

26-
suspend fun run() = coroutineScope {
37+
private var terminalException: Throwable? = null
38+
39+
suspend fun run() {
2740
log.info { "Destination Pipeline Starting..." }
28-
log.info { "Running with ${pipelines.size} input streams..." }
2941

30-
reconciler.run(CoroutineScope(Dispatchers.IO))
42+
log.info { "Starting state reconciler..." }
43+
reconciler.run()
3144

45+
log.info { "Starting ${pipelines.size} pipelines..." }
46+
pipelines.map { p -> pipelineScope.launch(exceptionHandler) { p.run() } }.joinAll()
47+
log.info { "Individual pipelines complete..." }
48+
49+
// shutdown the reconciler regardless of success or failure, so we don't hang
3250
try {
33-
pipelines.map { p -> launch { p.run() } }.joinAll()
34-
log.info { "Individual pipelines complete..." }
35-
} finally {
36-
// shutdown the reconciler regardless of success or failure, so we don't hang
37-
log.info { "Disabling reconciler..." }
51+
log.info { "Disabling state reconciler..." }
3852
reconciler.disable()
53+
} finally {
54+
if (aggregationDispatcher is ExecutorCoroutineDispatcher) {
55+
aggregationDispatcher.close()
56+
}
57+
}
58+
59+
terminalException?.let {
60+
log.error(terminalException) { "Destination Pipeline Completed — Exceptionally" }
61+
throw it
3962
}
4063

4164
log.info { "Flushing final states..." }
4265
reconciler.flushCompleteStates()
4366

44-
log.info { "Destination Pipeline Completed — Successfully" }
45-
4667
if (store.hasStates()) {
47-
log.info { "Unflushed states detected. Failing sync." }
48-
throw IllegalStateException("Sync completed, but unflushed states were detected.")
68+
val stateException =
69+
IllegalStateException("Sync completed, but unflushed states were detected.")
70+
log.info { "Destination Pipeline Completed — Exceptionally: $stateException" }
71+
throw stateException
4972
}
73+
74+
log.info { "Destination Pipeline Completed — Successfully" }
75+
}
76+
77+
// ensure all pipelines close when a single pipeline throws an exception
78+
private val exceptionHandler = CoroutineExceptionHandler { context, exception ->
79+
log.error { "Caught Pipeline Exception: $exception\n Cancelling Destination Pipeline..." }
80+
// close each input stream to cancel any blocking reads on them that would prevent shutdown
81+
inputStreams.closeAll()
82+
context.cancel()
83+
// capture the child exception to re-throw (core-CDK relies on catching thrown exceptions)
84+
// exceptions directly thrown from within this block will be suppressed
85+
terminalException = exception
5086
}
5187
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/StateReconciler.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ class StateReconciler(
2424
private val stateStore: StateStore,
2525
private val emittedStatsStore: EmittedStatsStore,
2626
private val consumer: OutputConsumer,
27-
@Named("stateReconciliationInterval")
28-
reconciliationInterval: Duration?, // only java durations can be injected
27+
@Named("stateReconcilerScope") private val scope: CoroutineScope,
28+
@Named("stateReconcilerInterval") interval: Duration?, // only java durations can be injected
2929
) {
3030
// allow overriding this for test purposes
31-
private val reconciliationInterval = reconciliationInterval?.toKotlinDuration() ?: 30.seconds
31+
private val interval = interval?.toKotlinDuration() ?: 30.seconds
3232
private lateinit var job: Job
3333

34-
fun run(scope: CoroutineScope) {
34+
fun run() {
3535
job =
3636
scope.launch {
3737
while (true) {
38-
delay(reconciliationInterval)
38+
delay(interval)
3939
flushCompleteStates()
4040
flushEmittedStats()
4141
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ClientSocket.kt

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,6 @@ class ClientSocket(
9090
// will break tests. TODO: Anything else.
9191
log.info { "Socket file $socketPath connected for reading" }
9292

93-
val inputStream = Channels.newInputStream(openedSocket).buffered(bufferSizeBytes)
94-
95-
return SocketInputStream(openedSocket, inputStream)
96-
}
97-
}
98-
99-
class SocketInputStream(
100-
private val socketChannel: SocketChannel,
101-
private val inputStream: InputStream,
102-
) : InputStream() {
103-
override fun read(): Int = inputStream.read()
104-
105-
override fun close() {
106-
inputStream.close()
107-
socketChannel.close()
93+
return Channels.newInputStream(openedSocket).buffered(bufferSizeBytes)
10894
}
10995
}

0 commit comments

Comments
 (0)