Skip to content

Commit e260995

Browse files
committed
fix: command api and sync workflow usage of the api (#16489)
1 parent 6ec210b commit e260995

File tree

6 files changed

+26
-15
lines changed

6 files changed

+26
-15
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3934,7 +3934,14 @@ paths:
39343934
content:
39353935
application/json:
39363936
schema:
3937+
title: GetWebhookConfigResponse
39373938
type: object
3939+
required:
3940+
- value
3941+
properties:
3942+
value:
3943+
type: string
3944+
39383945
/v1/jobs/job_failure:
39393946
post:
39403947
tags:
@@ -6364,7 +6371,7 @@ paths:
63646371
command_type:
63656372
type: string
63666373
command_input:
6367-
type: object
6374+
type: string
63686375
workspace_id:
63696376
type: string
63706377
format: uuid

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobInputHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ private void saveAttemptSyncConfig(final long jobId, final int attemptNumber, fi
342342
.syncConfig(apiPojoConverters.attemptSyncConfigToApi(attemptSyncConfig, connectionId)));
343343
}
344344

345-
public Object getJobWebhookConfig(final long jobId) throws IOException {
345+
public JobWebhookConfig getJobWebhookConfig(final long jobId) throws IOException {
346346
final Job job = jobPersistence.getJob(jobId);
347347
final JobConfig jobConfig = job.getConfig();
348348
if (jobConfig == null) {

airbyte-server/src/main/kotlin/io/airbyte/server/apis/controllers/CommandApiController.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class CommandApiController(
9292
id(commandGetRequest?.id)
9393
command?.let {
9494
commandType(it.commandType)
95-
commandInput(it.commandInput)
95+
commandInput(it.commandInput.toString())
9696
workspaceId(it.workspaceId)
9797
workloadId(it.workloadId)
9898
organizationId(it.organizationId)

airbyte-server/src/main/kotlin/io/airbyte/server/apis/controllers/JobsApiController.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.api.model.generated.ConnectionIdRequestBody
1111
import io.airbyte.api.model.generated.ConnectionJobRequestBody
1212
import io.airbyte.api.model.generated.DeleteStreamResetRecordsForJobRequest
1313
import io.airbyte.api.model.generated.GetWebhookConfigRequest
14+
import io.airbyte.api.model.generated.GetWebhookConfigResponse
1415
import io.airbyte.api.model.generated.InternalOperationResult
1516
import io.airbyte.api.model.generated.JobCreate
1617
import io.airbyte.api.model.generated.JobDebugInfoRead
@@ -31,6 +32,7 @@ import io.airbyte.api.problems.throwable.generated.ApiNotImplementedInOssProblem
3132
import io.airbyte.commons.auth.AuthRoleConstants
3233
import io.airbyte.commons.auth.generated.Intent
3334
import io.airbyte.commons.auth.permissions.RequiresIntent
35+
import io.airbyte.commons.json.Jsons
3436
import io.airbyte.commons.server.handlers.JobHistoryHandler
3537
import io.airbyte.commons.server.handlers.JobInputHandler
3638
import io.airbyte.commons.server.handlers.JobsHandler
@@ -143,9 +145,11 @@ open class JobsApiController(
143145
@Post("/getWebhookConfig")
144146
@Secured(AuthRoleConstants.READER, AuthRoleConstants.WORKSPACE_READER, AuthRoleConstants.ORGANIZATION_READER)
145147
@ExecuteOn(AirbyteTaskExecutors.IO)
146-
override fun getWebhookConfig(getWebhookConfigRequest: GetWebhookConfigRequest): Any? =
148+
override fun getWebhookConfig(
149+
@Body getWebhookConfigRequest: GetWebhookConfigRequest,
150+
): GetWebhookConfigResponse? =
147151
execute {
148-
jobInputHandler.getJobWebhookConfig(getWebhookConfigRequest.jobId)
152+
GetWebhookConfigResponse().value(Jsons.serialize(jobInputHandler.getJobWebhookConfig(getWebhookConfigRequest.jobId)))
149153
}
150154

151155
@Post("/job_failure")

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,9 @@ public Boolean isWorkspaceTombstone(UUID connectionId) {
320320
public GetWebhookConfigOutput getWebhookConfig(GetWebhookConfigInput input) {
321321
try {
322322
final JobWebhookConfig jobWebhookConfig = Jsons.deserialize(
323-
airbyteApiClient.getJobsApi().getWebhookConfig(new GetWebhookConfigRequest(input.getJobId())).toString(), JobWebhookConfig.class);
323+
airbyteApiClient.getJobsApi().getWebhookConfig(new GetWebhookConfigRequest(input.getJobId())).getValue(), JobWebhookConfig.class);
324324
return new GetWebhookConfigOutput(jobWebhookConfig.getOperationSequence(), jobWebhookConfig.getWebhookOperationConfigs());
325-
} catch (IOException e) {
325+
} catch (Exception e) {
326326
log.warn("Fail to get the webhook config.", e);
327327
throw new RuntimeException(e);
328328
}

airbyte-workers/src/test/kotlin/io/airbyte/workers/commands/ReplicationCommandTest.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ class ReplicationCommandTest {
9999
commandType = "replicate",
100100
commandInput =
101101
Jsons
102-
.jsonNode(
102+
.serialize(
103103
ReplicationCommandApiInput.ReplicationApiInput(connectionId, jobId.toString(), attemptId, CatalogDiff()),
104-
).toString(),
104+
),
105105
workloadId = workloadId,
106106
organizationId = organizationId,
107107
createdAt = java.time.OffsetDateTime.now(),
@@ -161,9 +161,9 @@ class ReplicationCommandTest {
161161
commandType = "replicate",
162162
commandInput =
163163
Jsons
164-
.jsonNode(
164+
.serialize(
165165
ReplicationCommandApiInput.ReplicationApiInput(connectionId, jobId.toString(), attemptId, CatalogDiff()),
166-
).toString(),
166+
),
167167
workloadId = workloadId,
168168
organizationId = organizationId,
169169
createdAt = java.time.OffsetDateTime.now(),
@@ -232,9 +232,9 @@ class ReplicationCommandTest {
232232
commandType = "replicate",
233233
commandInput =
234234
Jsons
235-
.jsonNode(
235+
.serialize(
236236
ReplicationCommandApiInput.ReplicationApiInput(connectionId, jobId.toString(), attemptId, CatalogDiff()),
237-
).toString(),
237+
),
238238
workloadId = workloadId,
239239
organizationId = organizationId,
240240
createdAt = java.time.OffsetDateTime.now(),
@@ -275,9 +275,9 @@ class ReplicationCommandTest {
275275
commandType = "replicate",
276276
commandInput =
277277
Jsons
278-
.jsonNode(
278+
.serialize(
279279
ReplicationCommandApiInput.ReplicationApiInput(connectionId, jobId.toString(), attemptId, CatalogDiff()),
280-
).toString(),
280+
),
281281
workloadId = workloadId,
282282
organizationId = organizationId,
283283
createdAt = java.time.OffsetDateTime.now(),

0 commit comments

Comments
 (0)