Skip to content

Commit 04db074

Browse files
benmoriceaugosusnp
andcommitted
chore: add the speed mode selection to the replication input (#16425)
Co-authored-by: Jimmy Ma <[email protected]>
1 parent 582aa97 commit 04db074

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

airbyte-server/src/main/kotlin/io/airbyte/server/services/JobInputService.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ class JobInputService(
208208
heartbeatMaxSecondsBetweenMessages = sourceDefinition.maxSecondsBetweenMessages,
209209
supportsRefreshes = sourceDefinitionVersion.supportsRefreshes,
210210
schemaRefreshOutput = appliedCatalogDiff?.let { RefreshSchemaActivityOutput(appliedDiff = it) },
211+
sourceIPCOptions = sourceDefinitionVersion.connectorIPCOptions,
212+
destinationIPCOptions = destinationDefinitionVersion.connectorIPCOptions,
211213
)
212214
}
213215

airbyte-server/src/test/kotlin/io/airbyte/server/services/JobInputServiceTest.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,19 +547,32 @@ class JobInputServiceTest {
547547
every { mockSourceDefinition.custom } returns false
548548
every { mockSourceDefinition.sourceType } returns StandardSourceDefinition.SourceType.DATABASE
549549

550+
val sourceICPOption =
551+
Jsons.jsonNode(
552+
mapOf(
553+
"source" to "ICPOption",
554+
),
555+
)
550556
val mockSourceDefinitionVersion = mockk<ActorDefinitionVersion>()
551557
every { mockSourceDefinitionVersion.dockerRepository } returns dockerRepository
552558
every { mockSourceDefinitionVersion.dockerImageTag } returns dockerImageTag
553559
every { mockSourceDefinitionVersion.protocolVersion } returns protocolVersion
554560
every { mockSourceDefinitionVersion.allowedHosts } returns AllowedHosts()
555561
every { mockSourceDefinitionVersion.supportsRefreshes } returns false
562+
every { mockSourceDefinitionVersion.connectorIPCOptions } returns sourceICPOption
556563

557564
val mockDestination = mockk<DestinationConnection>()
558565
every { mockDestination.destinationId } returns destinationId
559566
every { mockDestination.destinationDefinitionId } returns destinationDefinitionId
560567
every { mockDestination.workspaceId } returns workspaceId
561568
every { mockDestination.configuration } returns destinationConfiguration
562569

570+
val destinationICPOption =
571+
Jsons.jsonNode(
572+
mapOf(
573+
"destination" to "ICPOption",
574+
),
575+
)
563576
val mockDestinationDefinition = mockk<StandardDestinationDefinition>()
564577
every { mockDestinationDefinition.destinationDefinitionId } returns destinationDefinitionId
565578
every { mockDestinationDefinition.custom } returns true
@@ -569,6 +582,7 @@ class JobInputServiceTest {
569582
every { mockDestinationDefinitionVersion.dockerRepository } returns destinationImage
570583
every { mockDestinationDefinitionVersion.protocolVersion } returns protocolVersion
571584
every { mockDestinationDefinitionVersion.allowedHosts } returns AllowedHosts()
585+
every { mockDestinationDefinitionVersion.connectorIPCOptions } returns destinationICPOption
572586

573587
val attemptSyncConfig = mockk<AttemptSyncConfig>()
574588
every { attemptSyncConfig.sourceConfiguration } returns null
@@ -667,6 +681,8 @@ class JobInputServiceTest {
667681
heartbeatMaxSecondsBetweenMessages = 3600L,
668682
supportsRefreshes = false,
669683
schemaRefreshOutput = RefreshSchemaActivityOutput(appliedCatalogDiff),
684+
sourceIPCOptions = sourceICPOption,
685+
destinationIPCOptions = destinationICPOption,
670686
)
671687

672688
val actual = jobInputService.getReplicationInput(connectionId, appliedCatalogDiff, signalInput, jobId, attemptNumber.toLong())

0 commit comments

Comments
 (0)