Skip to content

Commit 0f0cf53

Browse files
fix: change the way we pick actor version to use the scoped version (#16459)
Co-authored-by: Pedro S. Lopez <[email protected]>
1 parent cb48042 commit 0f0cf53

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed

airbyte-container-orchestrator/src/main/kotlin/io/airbyte/container/orchestrator/bookkeeping/StateCheckSumErrorReporter.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,26 @@ class StateCheckSumErrorReporter(
8181
FailureReason.FailureOrigin.SOURCE -> {
8282
val sourceId = retry { airbyteApiClient.connectionApi.getConnection(ConnectionIdRequestBody(connectionId)).sourceId }
8383
val source = retry { airbyteApiClient.sourceApi.getSource(SourceIdRequestBody(sourceId)) }
84+
val sourceVersion =
85+
retry { airbyteApiClient.actorDefinitionVersionApi.getActorDefinitionVersionForSourceId(SourceIdRequestBody(sourceId)) }
8486
val sourceDefinition =
8587
retry { airbyteApiClient.sourceDefinitionApi.getSourceDefinition(SourceDefinitionIdRequestBody(source.sourceDefinitionId)) }
86-
dockerImageName = getDockerImageName(sourceDefinition.dockerRepository, sourceDefinition.dockerImageTag)
88+
dockerImageName = getDockerImageName(sourceVersion.dockerRepository, sourceVersion.dockerImageTag)
8789
metadata =
8890
getDefinitionMetadata(sourceDefinition.sourceDefinitionId, sourceDefinition.name, dockerImageName, sourceDefinition.releaseStage)
8991
}
9092
FailureReason.FailureOrigin.DESTINATION -> {
9193
val destinationId = retry { airbyteApiClient.connectionApi.getConnection(ConnectionIdRequestBody(connectionId)).destinationId }
9294
val destination = retry { airbyteApiClient.destinationApi.getDestination(DestinationIdRequestBody(destinationId)) }
95+
val destinationVersion =
96+
retry { airbyteApiClient.actorDefinitionVersionApi.getActorDefinitionVersionForDestinationId(DestinationIdRequestBody(destinationId)) }
9397
val destinationDefinition =
9498
retry {
9599
airbyteApiClient.destinationDefinitionApi.getDestinationDefinition(
96100
DestinationDefinitionIdRequestBody(destination.destinationDefinitionId),
97101
)
98102
}
99-
dockerImageName = getDockerImageName(destinationDefinition.dockerRepository, destinationDefinition.dockerImageTag)
103+
dockerImageName = getDockerImageName(destinationVersion.dockerRepository, destinationVersion.dockerImageTag)
100104
metadata =
101105
getDefinitionMetadata(
102106
destinationDefinition.destinationDefinitionId,

airbyte-container-orchestrator/src/test/kotlin/io/airbyte/container/orchestrator/bookkeeping/StateCheckSumErrorReporterTest.kt

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ package io.airbyte.container.orchestrator.bookkeeping
77
import com.amazonaws.internal.ExceptionUtils
88
import io.airbyte.api.client.AirbyteApiClient
99
import io.airbyte.api.client.WebUrlHelper
10+
import io.airbyte.api.client.model.generated.ActorDefinitionVersionRead
1011
import io.airbyte.api.client.model.generated.ConnectionRead
1112
import io.airbyte.api.client.model.generated.DestinationDefinitionRead
1213
import io.airbyte.api.client.model.generated.DestinationRead
1314
import io.airbyte.api.client.model.generated.ReleaseStage
1415
import io.airbyte.api.client.model.generated.SourceDefinitionRead
1516
import io.airbyte.api.client.model.generated.SourceRead
17+
import io.airbyte.api.client.model.generated.SupportState
1618
import io.airbyte.commons.json.Jsons
1719
import io.airbyte.config.Configs
1820
import io.airbyte.config.FailureReason
@@ -104,6 +106,16 @@ class StateCheckSumErrorReporterTest {
104106

105107
val sourceId = UUID.randomUUID()
106108
val sourceDefinitionId = UUID.randomUUID()
109+
val sourceVersion =
110+
ActorDefinitionVersionRead(
111+
"source-repo",
112+
"0.0.1",
113+
supportsRefreshes = true,
114+
isVersionOverrideApplied = true,
115+
SupportState.SUPPORTED,
116+
supportsFileTransfer = false,
117+
supportsDataActivation = false,
118+
)
107119
val sourceDefinition =
108120
SourceDefinitionRead(
109121
sourceDefinitionId,
@@ -121,6 +133,7 @@ class StateCheckSumErrorReporterTest {
121133

122134
every { airbyteApiClient.connectionApi.getConnection(any()) } returns connection
123135
every { airbyteApiClient.sourceApi.getSource(any()) } returns source
136+
every { airbyteApiClient.actorDefinitionVersionApi.getActorDefinitionVersionForSourceId(any()) } returns sourceVersion
124137
every { airbyteApiClient.sourceDefinitionApi.getSourceDefinition(any()) } returns sourceDefinition
125138

126139
stateCheckSumErrorReporter.reportError(
@@ -141,7 +154,7 @@ class StateCheckSumErrorReporterTest {
141154
).reportJobFailureReason(
142155
any(StandardWorkspace::class.java),
143156
any(FailureReason::class.java),
144-
eq("source-repo:0.1.0"),
157+
eq("source-repo:0.0.1"),
145158
anyMap(),
146159
ArgumentMatchers.eq(AttemptConfigReportingContext(null, null, State().withState(Jsons.jsonNode(stateMessage)))),
147160
)
@@ -160,6 +173,16 @@ class StateCheckSumErrorReporterTest {
160173

161174
val destinationId = UUID.randomUUID()
162175
val destinationDefinitionId = UUID.randomUUID()
176+
val destinationVersion =
177+
ActorDefinitionVersionRead(
178+
"destination-repo",
179+
"0.0.1",
180+
supportsRefreshes = true,
181+
isVersionOverrideApplied = true,
182+
SupportState.SUPPORTED,
183+
supportsFileTransfer = false,
184+
supportsDataActivation = false,
185+
)
163186

164187
val destinationDefinition =
165188
DestinationDefinitionRead(
@@ -179,6 +202,7 @@ class StateCheckSumErrorReporterTest {
179202

180203
every { airbyteApiClient.connectionApi.getConnection(any()) } returns connection
181204
every { airbyteApiClient.destinationApi.getDestination(any()) } returns destination
205+
every { airbyteApiClient.actorDefinitionVersionApi.getActorDefinitionVersionForDestinationId(any()) } returns destinationVersion
182206
every { airbyteApiClient.destinationDefinitionApi.getDestinationDefinition(any()) } returns destinationDefinition
183207

184208
stateCheckSumErrorReporter.reportError(
@@ -199,7 +223,7 @@ class StateCheckSumErrorReporterTest {
199223
).reportJobFailureReason(
200224
any(StandardWorkspace::class.java),
201225
any(FailureReason::class.java),
202-
eq("destination-repo:0.1.0"),
226+
eq("destination-repo:0.0.1"),
203227
anyMap(),
204228
ArgumentMatchers.eq(AttemptConfigReportingContext(null, null, State().withState(Jsons.jsonNode(stateMessage)))),
205229
)

0 commit comments

Comments
 (0)