Skip to content

Commit 139f689

Browse files
benmoriceaugosusnp
andcommitted
feat: replicate command (#16375)
Co-authored-by: Jimmy Ma <[email protected]>
1 parent 8427690 commit 139f689

File tree

15 files changed

+736
-1
lines changed

15 files changed

+736
-1
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6294,6 +6294,60 @@ paths:
62946294
type: string
62956295
enum: [pending, running, completed, cancelled]
62966296

6297+
/v1/commands/get:
6298+
post:
6299+
summary: Get the a command record
6300+
tags: [command, internal]
6301+
operationId: getCommand
6302+
requestBody:
6303+
content:
6304+
application/json:
6305+
schema:
6306+
type: object
6307+
title: CommandGetRequest
6308+
required: [id]
6309+
properties:
6310+
id:
6311+
type: string
6312+
responses:
6313+
"200":
6314+
description: Success
6315+
content:
6316+
application/json:
6317+
schema:
6318+
type: object
6319+
title: CommandGetResponse
6320+
required:
6321+
- id
6322+
- workload_id
6323+
- command_input
6324+
- command_type
6325+
- workspace_id
6326+
- organization_id
6327+
- created_at
6328+
- updated_at
6329+
properties:
6330+
id:
6331+
type: string
6332+
workload_id:
6333+
type: string
6334+
command_type:
6335+
type: string
6336+
command_input:
6337+
type: object
6338+
workspace_id:
6339+
type: string
6340+
format: uuid
6341+
organization_id:
6342+
type: string
6343+
format: uuid
6344+
created_at:
6345+
type: string
6346+
format: date-time
6347+
updated_at:
6348+
type: string
6349+
format: date-time
6350+
62976351
/v1/commands/cancel:
62986352
post:
62996353
summary: Get the status of command

airbyte-commons-temporal/src/main/kotlin/io/airbyte/commons/temporal/scheduling/ConnectorCommandWorkflow.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.commons.temporal.scheduling
77
import com.fasterxml.jackson.annotation.JsonSubTypes
88
import com.fasterxml.jackson.annotation.JsonTypeInfo
99
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
10+
import io.airbyte.config.CatalogDiff
1011
import io.airbyte.config.ConnectorJobOutput
1112
import io.airbyte.config.StandardCheckConnectionInput
1213
import io.airbyte.config.StandardDiscoverCatalogInput
@@ -28,6 +29,7 @@ import java.util.UUID
2829
JsonSubTypes.Type(value = DiscoverCommandInput::class, name = ConnectorCommandInput.DISCOVER),
2930
JsonSubTypes.Type(value = DiscoverCommandApiInput::class, name = ConnectorCommandInput.DISCOVER_COMMAND),
3031
JsonSubTypes.Type(value = SpecCommandInput::class, name = ConnectorCommandInput.SPEC),
32+
JsonSubTypes.Type(value = ReplicationCommandApiInput::class, name = ConnectorCommandInput.REPLICATION_COMMAND),
3133
)
3234
sealed interface ConnectorCommandInput {
3335
companion object {
@@ -36,6 +38,7 @@ sealed interface ConnectorCommandInput {
3638
const val DISCOVER = "discover"
3739
const val DISCOVER_COMMAND = "discover_command"
3840
const val SPEC = "spec"
41+
const val REPLICATION_COMMAND = "replication_command"
3942
}
4043

4144
val type: String
@@ -281,6 +284,57 @@ data class SpecCommandInput(
281284
}
282285
}
283286

287+
@JsonDeserialize(builder = ReplicationCommandApiInput.Builder::class)
288+
data class ReplicationCommandApiInput(
289+
val input: ReplicationApiInput,
290+
) : ConnectorCommandInput {
291+
override val type: String = ConnectorCommandInput.REPLICATION_COMMAND
292+
293+
// This is duplicated of io.airbyte.workers.model.CheckConnectionInput to avoid dependency hell
294+
@JsonDeserialize(builder = ReplicationApiInput.Builder::class)
295+
data class ReplicationApiInput(
296+
val connectionId: UUID,
297+
val jobId: String,
298+
val attemptId: Long,
299+
val appliedCatalogDiff: CatalogDiff?,
300+
) {
301+
class Builder
302+
@JvmOverloads
303+
constructor(
304+
var connectionId: UUID? = null,
305+
var jobId: String? = null,
306+
var attemptId: Long? = null,
307+
var appliedCatalogDiff: CatalogDiff? = null,
308+
) {
309+
fun actorId(connectionId: UUID) = apply { this.connectionId = connectionId }
310+
311+
fun jobId(jobId: String) = apply { this.jobId = jobId }
312+
313+
fun attemptId(attemptId: Long) = apply { this.attemptId = attemptId }
314+
315+
fun appliedCatalogDiff(appliedCatalogDiff: CatalogDiff) = apply { this.appliedCatalogDiff = appliedCatalogDiff }
316+
317+
fun build() =
318+
ReplicationApiInput(
319+
connectionId = connectionId ?: throw IllegalArgumentException("actorId must be specified"),
320+
jobId = jobId ?: throw IllegalArgumentException("jobId must be specified"),
321+
attemptId = attemptId ?: throw IllegalArgumentException("attemptId must be specified"),
322+
appliedCatalogDiff = appliedCatalogDiff,
323+
)
324+
}
325+
}
326+
327+
class Builder
328+
@JvmOverloads
329+
constructor(
330+
var input: ReplicationApiInput? = null,
331+
) {
332+
fun input(input: ReplicationApiInput) = apply { this.input = input }
333+
334+
fun build() = ReplicationCommandApiInput(input = input ?: throw IllegalArgumentException("input must be specified"))
335+
}
336+
}
337+
284338
@WorkflowInterface
285339
interface ConnectorCommandWorkflow {
286340
@WorkflowMethod
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.temporal.scheduling
6+
7+
import io.airbyte.commons.json.Jsons
8+
import io.airbyte.config.CatalogDiff
9+
import org.junit.jupiter.api.Assertions
10+
import org.junit.jupiter.api.Test
11+
import java.util.UUID
12+
13+
class ConnectorCommandWorkflowTest {
14+
@Test
15+
fun `appliedCatalogDiff sets the appliedCatalogDiff property`() {
16+
val replicationApiInput =
17+
ReplicationCommandApiInput.ReplicationApiInput(
18+
connectionId = UUID.randomUUID(),
19+
jobId = "123",
20+
attemptId = 1,
21+
appliedCatalogDiff =
22+
CatalogDiff().withAdditionalProperty(
23+
"key",
24+
"value",
25+
),
26+
)
27+
28+
val replicationCommandApiInput =
29+
ReplicationCommandApiInput(
30+
input = replicationApiInput,
31+
)
32+
33+
val serializedApiInput = Jsons.serialize(replicationCommandApiInput)
34+
val deserializedApiInput = Jsons.deserialize(serializedApiInput, ReplicationCommandApiInput::class.java)
35+
36+
Assertions.assertEquals(replicationCommandApiInput, deserializedApiInput)
37+
}
38+
}

airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ properties:
1414
- checkConnection
1515
- discoverCatalogId
1616
- spec
17+
- replicate
1718
checkConnection:
1819
"$ref": StandardCheckConnectionOutput.yaml
1920
discoverCatalogId:
@@ -26,5 +27,7 @@ properties:
2627
description: A boolean indicating whether the configuration was updated during the job, e.g. if an AirbyteConfigControlMessage was received.
2728
type: boolean
2829
default: false
30+
replicate:
31+
"$ref": StandardSyncOutput.yaml
2932
failureReason:
3033
"$ref": FailureReason.yaml

airbyte-connector-sidecar/src/main/kotlin/io/airbyte/connectorSidecar/ConnectorMessageProcessor.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ class ConnectorMessageProcessor(
315315
ConnectorJobOutput.OutputType.SPEC -> ConnectorCommand.SPEC
316316
ConnectorJobOutput.OutputType.CHECK_CONNECTION -> ConnectorCommand.CHECK
317317
ConnectorJobOutput.OutputType.DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER
318+
ConnectorJobOutput.OutputType.REPLICATE -> throw IllegalStateException("Cannot get connector command from output type $outputType")
318319
}
319320

320321
fun getMessagesByType(

airbyte-server/src/main/kotlin/io/airbyte/server/apis/controllers/CommandApiController.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import io.airbyte.api.model.generated.CancelCommandRequest
99
import io.airbyte.api.model.generated.CancelCommandResponse
1010
import io.airbyte.api.model.generated.CheckCommandOutputRequest
1111
import io.airbyte.api.model.generated.CheckCommandOutputResponse
12+
import io.airbyte.api.model.generated.CommandGetRequest
13+
import io.airbyte.api.model.generated.CommandGetResponse
1214
import io.airbyte.api.model.generated.CommandStatusRequest
1315
import io.airbyte.api.model.generated.CommandStatusResponse
1416
import io.airbyte.api.model.generated.DiscoverCommandOutputRequest
@@ -77,6 +79,27 @@ class CommandApiController(
7779
}
7880
}
7981

82+
@Post("/get")
83+
@Secured(AuthRoleConstants.WORKSPACE_READER)
84+
@ExecuteOn(AirbyteTaskExecutors.IO)
85+
override fun getCommand(
86+
@Body commandGetRequest: CommandGetRequest,
87+
): CommandGetResponse {
88+
val command = commandService.get(commandGetRequest.id)
89+
return CommandGetResponse().apply {
90+
id(commandGetRequest?.id)
91+
command?.let {
92+
commandType(it.commandType)
93+
commandInput(it.commandInput)
94+
workspaceId(it.workspaceId)
95+
workloadId(it.workloadId)
96+
organizationId(it.organizationId)
97+
createdAt(it.createdAt)
98+
updatedAt(it.updatedAt)
99+
}
100+
}
101+
}
102+
80103
@Post("/output/discover")
81104
@Secured(AuthRoleConstants.WORKSPACE_READER)
82105
@ExecuteOn(AirbyteTaskExecutors.IO)

airbyte-server/src/main/kotlin/io/airbyte/server/services/CommandService.kt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,33 @@ class CommandService(
428428
}
429429
}
430430

431+
data class CommandModel(
432+
val id: String,
433+
val workloadId: String,
434+
val commandType: String,
435+
val commandInput: JsonNode,
436+
val workspaceId: UUID,
437+
val organizationId: UUID,
438+
val createdAt: OffsetDateTime,
439+
val updatedAt: OffsetDateTime,
440+
)
441+
442+
fun get(commandId: String): CommandModel? =
443+
commandsRepository
444+
.findById(commandId)
445+
.map { command ->
446+
CommandModel(
447+
id = command.id,
448+
workloadId = command.workloadId,
449+
commandType = command.commandType,
450+
commandInput = command.commandInput,
451+
workspaceId = command.workspaceId,
452+
organizationId = command.organizationId,
453+
createdAt = command.createdAt ?: OffsetDateTime.now(),
454+
updatedAt = command.updatedAt ?: OffsetDateTime.now(),
455+
)
456+
}.orElse(null)
457+
431458
fun getStatus(commandId: String): CommandStatus? =
432459
commandsRepository
433460
.findById(commandId)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.models
6+
7+
import io.airbyte.config.CatalogDiff
8+
import java.util.UUID
9+
10+
data class ReplicationApiInput(
11+
val connectionId: UUID,
12+
val jobId: String,
13+
val attemptId: Long,
14+
val appliedCatalogDiff: CatalogDiff?,
15+
)

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/AsyncReplicationActivityImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public StandardSyncOutput getReplicationOutput(final ReplicationActivityInput re
201201
final var workerAndReplicationInput = getWorkerAndReplicationInput(replicationActivityInput);
202202
final WorkloadApiWorker worker = workerAndReplicationInput.worker;
203203

204-
final var output = worker.getOutput(workloadId);
204+
final ReplicationOutput output = worker.getOutput(workloadId);
205205
return finalizeOutput(replicationActivityInput, output);
206206
} catch (final Exception e) {
207207
ApmTraceUtils.addActualRootCauseToTrace(e);

0 commit comments

Comments
 (0)