Skip to content

Commit 1367b4f

Browse files
committed
Cleanup. Rev version.
1 parent 9e3f9bf commit 1367b4f

File tree

8 files changed

+120
-98
lines changed

8 files changed

+120
-98
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Version 0.1.41
2+
3+
Datflow Load CDK: Fixes hang when one of many parallel pipelines fails. Organizes thread pools.
4+
15
## Version 0.1.40
26

37
Add gradle task to bump CDK version + add changelog entry
@@ -6,6 +10,8 @@ Add gradle task to bump CDK version + add changelog entry
610

711
Minor fixes with stream completion logic + proto conversion in Load CDK.
812

13+
* **Changed:** Minor fixes with stream completion logic + proto conversion
14+
915
## Version 0.1.38
1016

1117
Adds stats support for "speed" mode to the Load CDK

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/config/DispatcherBeanFactory.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ import kotlinx.coroutines.CoroutineScope
1313
import kotlinx.coroutines.Dispatchers
1414
import kotlinx.coroutines.asCoroutineDispatcher
1515

16-
/**
17-
* The dispatchers (think views of thread pools) and static scopes we use for dataflow.
18-
*/
16+
/** The dispatchers (think views of thread pools) and static scopes we use for dataflow. */
1917
@Factory
2018
class DispatcherBeanFactory {
2119
@Named("pipelineRunnerDispatcher")

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/pipeline/PipelineRunner.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class PipelineRunner(
5656
reconciler.flushCompleteStates()
5757

5858
if (store.hasStates()) {
59-
val stateException = IllegalStateException("Sync completed, but unflushed states were detected.")
59+
val stateException =
60+
IllegalStateException("Sync completed, but unflushed states were detected.")
6061
log.info { "Destination Pipeline Completed — Exceptionally: $stateException" }
6162
throw stateException
6263
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ClientSocket.kt

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,6 @@ class ClientSocket(
9090
// will break tests. TODO: Anything else.
9191
log.info { "Socket file $socketPath connected for reading" }
9292

93-
val inputStream = Channels.newInputStream(openedSocket).buffered(bufferSizeBytes)
94-
95-
return inputStream
96-
}
97-
}
98-
99-
class SocketInputStream(
100-
private val socketChannel: SocketChannel,
101-
private val inputStream: InputStream,
102-
) : InputStream() {
103-
override fun read(): Int = inputStream.read()
104-
105-
override fun close() {
106-
inputStream.close()
107-
socketChannel.close()
93+
return Channels.newInputStream(openedSocket).buffered(bufferSizeBytes)
10894
}
10995
}

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/dataflow/pipeline/DataFlowPipelineTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class DataFlowPipelineTest {
3434
val testScope = TestScope(this.testScheduler)
3535
val aggregationDispatcher = StandardTestDispatcher(testScope.testScheduler)
3636
val flushDispatcher = StandardTestDispatcher(testScope.testScheduler)
37-
37+
3838
// Given
3939
val initialIO = mockk<DataFlowStageIO>()
4040
val input = flowOf(initialIO)
@@ -68,7 +68,7 @@ class DataFlowPipelineTest {
6868

6969
// When
7070
pipeline.run()
71-
71+
7272
// Advance the test scheduler to process all coroutines
7373
testScope.testScheduler.advanceUntilIdle()
7474

@@ -81,4 +81,4 @@ class DataFlowPipelineTest {
8181
completionHandler.apply(null)
8282
}
8383
}
84-
}
84+
}

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/dataflow/pipeline/PipelineRunnerTest.kt

Lines changed: 103 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,15 @@ class PipelineRunnerTest {
5353
fun `run should execute all pipelines concurrently`() = runTest {
5454
// Given
5555
val pipelineScope = Fixtures.testScope(this.coroutineContext)
56-
val runner = PipelineRunner(reconciler, store, listOf(pipeline1, pipeline2, pipeline3), inputStreams, pipelineScope)
57-
56+
val runner =
57+
PipelineRunner(
58+
reconciler,
59+
store,
60+
listOf(pipeline1, pipeline2, pipeline3),
61+
inputStreams,
62+
pipelineScope
63+
)
64+
5865
coEvery { pipeline1.run() } coAnswers { delay(100) }
5966
coEvery { pipeline2.run() } coAnswers { delay(50) }
6067
coEvery { pipeline3.run() } coAnswers { delay(75) }
@@ -79,13 +86,15 @@ class PipelineRunnerTest {
7986
val exception = RuntimeException("Pipeline failed")
8087
coEvery { pipeline1.run() } throws exception
8188
val failingScope = Fixtures.testScope(this.coroutineContext)
82-
val failingRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, failingScope)
89+
val failingRunner =
90+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, failingScope)
8391

8492
// When/Then
85-
val thrownException = assertThrows<RuntimeException> {
86-
failingRunner.run()
87-
testScheduler.advanceUntilIdle()
88-
}
93+
val thrownException =
94+
assertThrows<RuntimeException> {
95+
failingRunner.run()
96+
testScheduler.advanceUntilIdle()
97+
}
8998

9099
assertEquals("Pipeline failed", thrownException.message)
91100

@@ -99,7 +108,8 @@ class PipelineRunnerTest {
99108
// Given
100109
coEvery { pipeline1.run() } just Runs
101110
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
102-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
111+
val singlePipelineRunner =
112+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
103113

104114
// When
105115
singlePipelineRunner.run()
@@ -132,7 +142,8 @@ class PipelineRunnerTest {
132142
// Given
133143
coEvery { pipeline1.run() } just Runs
134144
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
135-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
145+
val singlePipelineRunner =
146+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
136147

137148
// When
138149
singlePipelineRunner.run()
@@ -151,13 +162,15 @@ class PipelineRunnerTest {
151162
coEvery { pipeline1.run() } just Runs
152163
coEvery { reconciler.disable() } throws RuntimeException("Failed to disable")
153164
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
154-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
165+
val singlePipelineRunner =
166+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
155167

156168
// When/Then
157-
val exception = assertThrows<RuntimeException> {
158-
singlePipelineRunner.run()
159-
testScheduler.advanceUntilIdle()
160-
}
169+
val exception =
170+
assertThrows<RuntimeException> {
171+
singlePipelineRunner.run()
172+
testScheduler.advanceUntilIdle()
173+
}
161174

162175
assertEquals("Failed to disable", exception.message)
163176

@@ -172,13 +185,15 @@ class PipelineRunnerTest {
172185
coEvery { pipeline1.run() } just Runs
173186
every { reconciler.flushCompleteStates() } throws RuntimeException("Failed to flush")
174187
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
175-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
188+
val singlePipelineRunner =
189+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
176190

177191
// When/Then
178-
val exception = assertThrows<RuntimeException> {
179-
singlePipelineRunner.run()
180-
testScheduler.advanceUntilIdle()
181-
}
192+
val exception =
193+
assertThrows<RuntimeException> {
194+
singlePipelineRunner.run()
195+
testScheduler.advanceUntilIdle()
196+
}
182197

183198
assertEquals("Failed to flush", exception.message)
184199

@@ -193,28 +208,30 @@ class PipelineRunnerTest {
193208
// Given
194209
val exception = RuntimeException("Pipeline failed")
195210
coEvery { pipeline1.run() } throws exception
196-
211+
197212
val failingScope = Fixtures.testScope(this.coroutineContext)
198-
val failingRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, failingScope)
213+
val failingRunner =
214+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, failingScope)
199215

200216
// When
201-
assertThrows<RuntimeException> {
217+
assertThrows<RuntimeException> {
202218
failingRunner.run()
203219
testScheduler.advanceUntilIdle()
204220
}
205-
221+
206222
// Then - verify that closeAll was called due to exception handler
207223
verify(atLeast = 1) { inputStreams.closeAll() }
208224
}
209225

210226
@Test
211227
fun `run should handle large number of pipelines`() = runTest {
212228
// Given
213-
val pipelines = (1..32).map {
214-
val pipeline = mockk<DataFlowPipeline>()
215-
coEvery { pipeline.run() } just Runs
216-
pipeline
217-
}
229+
val pipelines =
230+
(1..32).map {
231+
val pipeline = mockk<DataFlowPipeline>()
232+
coEvery { pipeline.run() } just Runs
233+
pipeline
234+
}
218235

219236
val largeScope = Fixtures.testScope(this.coroutineContext)
220237
val largeRunner = PipelineRunner(reconciler, store, pipelines, inputStreams, largeScope)
@@ -237,28 +254,32 @@ class PipelineRunnerTest {
237254
var pipeline2Complete = false
238255
var reconcilerDisabled = false
239256

240-
coEvery { pipeline1.run() } coAnswers {
241-
delay(100)
242-
pipeline1Complete = true
243-
}
244-
coEvery { pipeline2.run() } coAnswers {
245-
delay(50)
246-
pipeline2Complete = true
247-
}
248-
coEvery { reconciler.disable() } answers {
249-
reconcilerDisabled = true
250-
assertTrue(
251-
pipeline1Complete,
252-
"Pipeline 1 should be complete before disabling reconciler"
253-
)
254-
assertTrue(
255-
pipeline2Complete,
256-
"Pipeline 2 should be complete before disabling reconciler"
257-
)
258-
}
257+
coEvery { pipeline1.run() } coAnswers
258+
{
259+
delay(100)
260+
pipeline1Complete = true
261+
}
262+
coEvery { pipeline2.run() } coAnswers
263+
{
264+
delay(50)
265+
pipeline2Complete = true
266+
}
267+
coEvery { reconciler.disable() } answers
268+
{
269+
reconcilerDisabled = true
270+
assertTrue(
271+
pipeline1Complete,
272+
"Pipeline 1 should be complete before disabling reconciler"
273+
)
274+
assertTrue(
275+
pipeline2Complete,
276+
"Pipeline 2 should be complete before disabling reconciler"
277+
)
278+
}
259279

260280
val twoScope = Fixtures.testScope(this.coroutineContext)
261-
val twoRunner = PipelineRunner(reconciler, store, listOf(pipeline1, pipeline2), inputStreams, twoScope)
281+
val twoRunner =
282+
PipelineRunner(reconciler, store, listOf(pipeline1, pipeline2), inputStreams, twoScope)
262283

263284
// When
264285
twoRunner.run()
@@ -269,39 +290,49 @@ class PipelineRunnerTest {
269290
}
270291

271292
@Test
272-
fun `run should throw IllegalStateException when unflushed states exist at sync end`() = runTest {
273-
// Given
274-
coEvery { pipeline1.run() } just Runs
275-
every { store.hasStates() } returns true // Simulate unflushed states exist
276-
277-
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
278-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
279-
280-
// When/Then
281-
val exception = assertThrows<IllegalStateException> {
282-
singlePipelineRunner.run()
283-
testScheduler.advanceUntilIdle()
293+
fun `run should throw IllegalStateException when unflushed states exist at sync end`() =
294+
runTest {
295+
// Given
296+
coEvery { pipeline1.run() } just Runs
297+
every { store.hasStates() } returns true // Simulate unflushed states exist
298+
299+
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
300+
val singlePipelineRunner =
301+
PipelineRunner(
302+
reconciler,
303+
store,
304+
listOf(pipeline1),
305+
inputStreams,
306+
singlePipelineScope
307+
)
308+
309+
// When/Then
310+
val exception =
311+
assertThrows<IllegalStateException> {
312+
singlePipelineRunner.run()
313+
testScheduler.advanceUntilIdle()
314+
}
315+
316+
// Then
317+
assertEquals("Sync completed, but unflushed states were detected.", exception.message)
318+
319+
// Verify all operations completed before the check
320+
coVerify(exactly = 1) { reconciler.run() }
321+
coVerify(exactly = 1) { pipeline1.run() }
322+
coVerify(exactly = 1) { reconciler.disable() }
323+
coVerify(exactly = 1) { reconciler.flushCompleteStates() }
324+
coVerify(exactly = 1) { store.hasStates() }
284325
}
285326

286-
// Then
287-
assertEquals("Sync completed, but unflushed states were detected.", exception.message)
288-
289-
// Verify all operations completed before the check
290-
coVerify(exactly = 1) { reconciler.run() }
291-
coVerify(exactly = 1) { pipeline1.run() }
292-
coVerify(exactly = 1) { reconciler.disable() }
293-
coVerify(exactly = 1) { reconciler.flushCompleteStates() }
294-
coVerify(exactly = 1) { store.hasStates() }
295-
}
296-
297327
@Test
298328
fun `run should not throw exception when all states are flushed`() = runTest {
299329
// Given
300330
coEvery { pipeline1.run() } just Runs
301331
every { store.hasStates() } returns false // All states are flushed
302332

303333
val singlePipelineScope = Fixtures.testScope(this.coroutineContext)
304-
val singlePipelineRunner = PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
334+
val singlePipelineRunner =
335+
PipelineRunner(reconciler, store, listOf(pipeline1), inputStreams, singlePipelineScope)
305336

306337
// When - should complete without exception
307338
singlePipelineRunner.run()

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/dataflow/state/StateReconcilerTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class StateReconcilerTest {
5050
fun setUp() {
5151
testScope = TestScope(StandardTestDispatcher())
5252
reconcilerScope = CoroutineScope(testScope.coroutineContext)
53-
53+
5454
stateReconciler =
5555
StateReconciler(
5656
stateStore,
@@ -146,7 +146,7 @@ class StateReconcilerTest {
146146
every { emittedStatsStore.getStats() } returns statsList
147147

148148
every { consumer.accept(any<AirbyteMessage>()) } just Runs
149-
149+
150150
// Create a new reconciler with the test scope for this test
151151
val localReconciler =
152152
StateReconciler(
@@ -156,7 +156,7 @@ class StateReconcilerTest {
156156
this.backgroundScope,
157157
interval.toJavaDuration(),
158158
)
159-
159+
160160
// When
161161
localReconciler.run()
162162

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.1.40
1+
version=0.1.41

0 commit comments

Comments
 (0)