Skip to content

Commit 049854a

Browse files
committed
chore: do not send DA destination sync modes to sources (#16733)
1 parent dcb3c8c commit 049854a

File tree

9 files changed

+133
-26
lines changed

9 files changed

+133
-26
lines changed

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/DefaultProtocolSerializer.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,74 @@
44

55
package io.airbyte.commons.protocol;
66

7+
import static io.micronaut.core.util.CollectionUtils.setOf;
8+
79
import io.airbyte.commons.enums.Enums;
810
import io.airbyte.commons.json.Jsons;
911
import io.airbyte.config.ConfiguredAirbyteCatalog;
1012
import io.airbyte.config.ConfiguredAirbyteStream;
1113
import io.airbyte.config.DestinationSyncMode;
1214
import io.airbyte.config.helpers.ProtocolConverters;
15+
import java.util.Set;
1316

1417
/**
1518
* Default JSON serialization for the Airbyte Protocol.
1619
*/
1720
public class DefaultProtocolSerializer implements ProtocolSerializer {
1821

22+
private static final Set<DestinationSyncMode> legacyDestinationSyncModes = setOf(
23+
DestinationSyncMode.APPEND,
24+
DestinationSyncMode.APPEND_DEDUP,
25+
DestinationSyncMode.OVERWRITE,
26+
DestinationSyncMode.OVERWRITE_DEDUP);
27+
1928
@Override
20-
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog, final boolean supportsRefreshes) {
29+
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog,
30+
final boolean supportsRefreshes,
31+
final SerializationTarget target) {
2132
// Copy to avoid mutating input
2233
final ConfiguredAirbyteCatalog clonedCatalog = Jsons.clone(configuredAirbyteCatalog);
23-
replaceDestinationSyncModes(clonedCatalog, supportsRefreshes);
34+
replaceDestinationSyncModes(clonedCatalog, supportsRefreshes, target);
2435

2536
return Jsons.serialize(toProtocol(clonedCatalog));
2637
}
2738

28-
private void replaceDestinationSyncModes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog, final boolean supportsRefreshes) {
39+
private void replaceDestinationSyncModes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog,
40+
final boolean supportsRefreshes,
41+
final SerializationTarget target) {
2942
// Ensure we convert destination sync modes to the expected ones
3043
for (final ConfiguredAirbyteStream stream : configuredAirbyteCatalog.getStreams()) {
31-
if (supportsRefreshes) {
32-
if (DestinationSyncMode.OVERWRITE.equals(stream.getDestinationSyncMode())) {
44+
if (target == SerializationTarget.SOURCE) {
45+
// New destination sync modes were added to for data activation destinations.
46+
// However, because DestinationSyncModes are an enum and currently passed to sources even though the
47+
// value is irrelevant to the sources,
48+
// they end up failing to deserialize the configured catalog.
49+
// This hides the new sync modes from all sources, until we effectively split the configured catalog
50+
// into a source and destination version.
51+
if (legacyDestinationSyncModes.contains(stream.getDestinationSyncMode())) {
52+
stream.setDestinationSyncMode(getNonDataActivationDestinationSyncMode(stream.getDestinationSyncMode(), supportsRefreshes));
53+
} else {
3354
stream.setDestinationSyncMode(DestinationSyncMode.APPEND);
34-
} else if (DestinationSyncMode.OVERWRITE_DEDUP.equals(stream.getDestinationSyncMode())) {
35-
stream.setDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP);
3655
}
3756
} else {
38-
if (DestinationSyncMode.OVERWRITE_DEDUP.equals(stream.getDestinationSyncMode())) {
39-
stream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
40-
}
57+
stream.setDestinationSyncMode(getNonDataActivationDestinationSyncMode(stream.getDestinationSyncMode(), supportsRefreshes));
58+
}
59+
}
60+
}
61+
62+
private DestinationSyncMode getNonDataActivationDestinationSyncMode(final DestinationSyncMode syncMode, Boolean supportsRefreshes) {
63+
if (supportsRefreshes) {
64+
if (DestinationSyncMode.OVERWRITE.equals(syncMode)) {
65+
return DestinationSyncMode.APPEND;
66+
} else if (DestinationSyncMode.OVERWRITE_DEDUP.equals(syncMode)) {
67+
return DestinationSyncMode.APPEND_DEDUP;
68+
}
69+
} else {
70+
if (DestinationSyncMode.OVERWRITE_DEDUP.equals(syncMode)) {
71+
return DestinationSyncMode.OVERWRITE;
4172
}
4273
}
74+
return syncMode;
4375
}
4476

4577
/**

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/ProtocolSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
*/
1212
public interface ProtocolSerializer {
1313

14-
String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog, final boolean supportsRefreshes);
14+
String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog, final boolean supportsRefreshes, final SerializationTarget target);
1515

1616
}

airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/VersionedProtocolSerializer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator config
2727
}
2828

2929
@Override
30-
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog, final boolean supportsRefreshes) {
30+
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog,
31+
final boolean supportsRefreshes,
32+
final SerializationTarget target) {
3133
// TODO: rework the migration part to support different protocol version. This currently works
3234
// because we only have one major.
33-
return new DefaultProtocolSerializer().serialize(configuredAirbyteCatalog, supportsRefreshes);
35+
return new DefaultProtocolSerializer().serialize(configuredAirbyteCatalog, supportsRefreshes, target);
3436
}
3537

3638
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.commons.protocol
6+
7+
enum class SerializationTarget {
8+
DESTINATION,
9+
ORCHESTRATOR,
10+
SOURCE,
11+
}

airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/DefaultProtocolSerializerTest.kt

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,33 @@ class DefaultProtocolSerializerTest {
2121
@Test
2222
fun `verify we do not write certain destination sync modes serialization with refresh support`() {
2323
val serializer = DefaultProtocolSerializer()
24-
verifyDestinationSyncModesOverrides(serializer, true)
24+
verifyDestinationSyncModesOverrides(serializer, true, SerializationTarget.DESTINATION)
2525
}
2626

2727
@Test
2828
fun `verify we remain backward compatible for destination sync modes when refreshes are not supported`() {
2929
val serializer = DefaultProtocolSerializer()
30-
verifyDestinationSyncModesOverrides(serializer, false)
30+
verifyDestinationSyncModesOverrides(serializer, false, SerializationTarget.DESTINATION)
31+
}
32+
33+
@Test
34+
fun `verify we do not write data activation destination sync modes to the sources`() {
35+
val serializer = DefaultProtocolSerializer()
36+
verifyDestinationSyncModesOverrides(serializer, true, SerializationTarget.SOURCE)
3137
}
3238

3339
companion object {
3440
fun verifyDestinationSyncModesOverrides(
3541
serializer: ProtocolSerializer,
3642
supportRefreshes: Boolean,
43+
target: SerializationTarget,
3744
) {
3845
val appendStreamName = "append"
3946
val overwriteStreamName = "overwrite"
4047
val appendDedupStreamName = "append_dedup"
4148
val overwriteDedupStreamName = "overwrite_dedup"
49+
val updateStreamName = "update"
50+
val softDeleteStreamName = "soft_delete"
4251

4352
val configuredCatalog =
4453
ConfiguredAirbyteCatalog()
@@ -48,6 +57,8 @@ class DefaultProtocolSerializerTest {
4857
ConfiguredAirbyteStream(getAirbyteStream(overwriteStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE),
4958
ConfiguredAirbyteStream(getAirbyteStream(appendDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND_DEDUP),
5059
ConfiguredAirbyteStream(getAirbyteStream(overwriteDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE_DEDUP),
60+
ConfiguredAirbyteStream(getAirbyteStream(updateStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.UPDATE),
61+
ConfiguredAirbyteStream(getAirbyteStream(softDeleteStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.SOFT_DELETE),
5162
),
5263
)
5364
val frozenConfiguredCatalog = Jsons.clone(configuredCatalog)
@@ -96,10 +107,44 @@ class DefaultProtocolSerializerTest {
96107
).withSyncMode(io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH)
97108
.withDestinationSyncMode(if (supportRefreshes) ProtocolDestinationSyncMode.APPEND_DEDUP else ProtocolDestinationSyncMode.OVERWRITE)
98109
.withIncludeFiles(false),
110+
ProtocolConfiguredAirbyteStream()
111+
.withStream(
112+
ProtocolAirbyteStream()
113+
.withName(
114+
updateStreamName,
115+
).withJsonSchema(Jsons.emptyObject())
116+
.withSupportedSyncModes(listOf(io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH)),
117+
).withSyncMode(io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH)
118+
.withDestinationSyncMode(
119+
if (target !=
120+
SerializationTarget.SOURCE
121+
) {
122+
ProtocolDestinationSyncMode.UPDATE
123+
} else {
124+
ProtocolDestinationSyncMode.APPEND
125+
},
126+
).withIncludeFiles(false),
127+
ProtocolConfiguredAirbyteStream()
128+
.withStream(
129+
ProtocolAirbyteStream()
130+
.withName(
131+
softDeleteStreamName,
132+
).withJsonSchema(Jsons.emptyObject())
133+
.withSupportedSyncModes(listOf(io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH)),
134+
).withSyncMode(io.airbyte.protocol.models.v0.SyncMode.FULL_REFRESH)
135+
.withDestinationSyncMode(
136+
if (target !=
137+
SerializationTarget.SOURCE
138+
) {
139+
ProtocolDestinationSyncMode.SOFT_DELETE
140+
} else {
141+
ProtocolDestinationSyncMode.APPEND
142+
},
143+
).withIncludeFiles(false),
99144
),
100145
)
101146

102-
val serializedCatalog = serializer.serialize(configuredCatalog, supportRefreshes)
147+
val serializedCatalog = serializer.serialize(configuredCatalog, supportRefreshes, target)
103148
val actualCatalog = Jsons.deserialize(serializedCatalog, io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog::class.java)
104149

105150
// Verify we serialized what's expected

airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/VersionedProtocolSerializerTest.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ class VersionedProtocolSerializerTest {
1212
@Test
1313
fun `verify we do not write certain destination sync modes serialization with refresh support`() {
1414
val serializer = VersionedProtocolSerializer(ConfiguredAirbyteCatalogMigrator(listOf()), AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)
15-
verifyDestinationSyncModesOverrides(serializer, true)
15+
verifyDestinationSyncModesOverrides(serializer, true, SerializationTarget.DESTINATION)
1616
}
1717

1818
@Test
1919
fun `verify we remain backward compatible for destination sync modes when refreshes are not supported`() {
2020
val serializer = VersionedProtocolSerializer(ConfiguredAirbyteCatalogMigrator(listOf()), AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)
21-
verifyDestinationSyncModesOverrides(serializer, false)
21+
verifyDestinationSyncModesOverrides(serializer, false, SerializationTarget.DESTINATION)
22+
}
23+
24+
@Test
25+
fun `verify we do not write data activation destination sync modes to the sources`() {
26+
val serializer = VersionedProtocolSerializer(ConfiguredAirbyteCatalogMigrator(listOf()), AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)
27+
verifyDestinationSyncModesOverrides(serializer, true, SerializationTarget.SOURCE)
2228
}
2329
}

airbyte-data/src/test/kotlin/io/airbyte/data/services/impls/jooq/DbConverterTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.data.services.impls.jooq
77
import com.google.common.io.Resources
88
import io.airbyte.commons.json.Jsons
99
import io.airbyte.commons.protocol.DefaultProtocolSerializer
10+
import io.airbyte.commons.protocol.SerializationTarget
1011
import io.airbyte.config.Notification
1112
import io.airbyte.config.Notification.NotificationType
1213
import io.airbyte.config.NotificationSettings
@@ -51,7 +52,7 @@ internal class DbConverterTest {
5152
protocol: ProtocolConfiguredAirbyteCatalog,
5253
) {
5354
val internalToProtocol: ProtocolConfiguredAirbyteCatalog =
54-
parseConfiguredAirbyteCatalogAsProtocol(DefaultProtocolSerializer().serialize(internal, false))
55+
parseConfiguredAirbyteCatalogAsProtocol(DefaultProtocolSerializer().serialize(internal, false, SerializationTarget.SOURCE))
5556

5657
// This is to ease the defaults in the comparison. The frozen catalogs do not have includeFiles,
5758
// we default to false in the new model

airbyte-workload-init-container/src/main/kotlin/input/ReplicationHydrationProcessor.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.initContainer.input
66

77
import io.airbyte.commons.protocol.ProtocolSerializer
8+
import io.airbyte.commons.protocol.SerializationTarget
89
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType
910
import io.airbyte.initContainer.serde.ObjectSerializer
1011
import io.airbyte.initContainer.system.FileClient
@@ -65,7 +66,7 @@ class ReplicationHydrationProcessor(
6566
logger.info { "Writing source inputs..." }
6667
fileClient.writeInputFile(
6768
FileConstants.CATALOG_FILE,
68-
protocolSerializer.serialize(hydrated.catalog, false),
69+
protocolSerializer.serialize(hydrated.catalog, false, SerializationTarget.SOURCE),
6970
SOURCE_DIR,
7071
)
7172

@@ -97,7 +98,7 @@ class ReplicationHydrationProcessor(
9798
// Write original catalog as is
9899
fileClient.writeInputFile(
99100
FileConstants.CATALOG_FILE,
100-
protocolSerializer.serialize(hydrated.catalog, hydrated.destinationSupportsRefreshes),
101+
protocolSerializer.serialize(hydrated.catalog, hydrated.destinationSupportsRefreshes, SerializationTarget.DESTINATION),
101102
DEST_DIR,
102103
)
103104

@@ -118,7 +119,7 @@ class ReplicationHydrationProcessor(
118119

119120
fileClient.writeInputFile(
120121
FileConstants.CATALOG_FILE,
121-
protocolSerializer.serialize(destinationCatalog, hydrated.destinationSupportsRefreshes),
122+
protocolSerializer.serialize(destinationCatalog, hydrated.destinationSupportsRefreshes, SerializationTarget.DESTINATION),
122123
DEST_DIR,
123124
)
124125
}

airbyte-workload-init-container/src/test/kotlin/input/ReplicationHydrationProcessorTest.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.initContainer.input
66

77
import io.airbyte.commons.json.Jsons
88
import io.airbyte.commons.protocol.ProtocolSerializer
9+
import io.airbyte.commons.protocol.SerializationTarget
910
import io.airbyte.config.AirbyteStream
1011
import io.airbyte.config.ConfiguredAirbyteCatalog
1112
import io.airbyte.config.ConfiguredAirbyteStream
@@ -128,8 +129,10 @@ class ReplicationHydrationProcessorTest {
128129
every { serializer.serialize(hydrated.sourceConfiguration) } returns serializedSrcConfig
129130
every { serializer.serialize(hydrated.destinationConfiguration) } returns serializedDestConfig
130131
every { serializer.serialize(hydrated.state?.state) } returns serializedState
131-
every { protocolSerializer.serialize(hydrated.catalog, false) } returns serializedSrcCatalog
132-
every { protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes) } returns serializedDestCatalog
132+
every { protocolSerializer.serialize(hydrated.catalog, false, SerializationTarget.SOURCE) } returns serializedSrcCatalog
133+
every {
134+
protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes, SerializationTarget.DESTINATION)
135+
} returns serializedDestCatalog
133136
every {
134137
destinationCatalogGenerator.generateDestinationCatalog(any())
135138
} returns DestinationCatalogGenerator.CatalogGenerationResult(hydrated.catalog, mapOf())
@@ -142,8 +145,14 @@ class ReplicationHydrationProcessorTest {
142145
verify { fileClient.writeInputFile(FileConstants.INIT_INPUT_FILE, serializedReplInput) }
143146
verify { serializer.serialize(hydrated.sourceConfiguration) }
144147
verify { serializer.serialize(hydrated.destinationConfiguration) }
145-
verify { protocolSerializer.serialize(hydrated.catalog, false) }
146-
verify { protocolSerializer.serialize(mapper.mapCatalog(hydrated.catalog), hydrated.destinationSupportsRefreshes) }
148+
verify { protocolSerializer.serialize(hydrated.catalog, false, SerializationTarget.SOURCE) }
149+
verify {
150+
protocolSerializer.serialize(
151+
mapper.mapCatalog(hydrated.catalog),
152+
hydrated.destinationSupportsRefreshes,
153+
SerializationTarget.DESTINATION,
154+
)
155+
}
147156
verify { fileClient.writeInputFile(FileConstants.CATALOG_FILE, serializedSrcCatalog, FileConstants.SOURCE_DIR) }
148157
verify { fileClient.writeInputFile(FileConstants.CONNECTOR_CONFIG_FILE, serializedSrcConfig, FileConstants.SOURCE_DIR) }
149158
verify(exactly = timesStateFileWritten) { fileClient.writeInputFile(FileConstants.INPUT_STATE_FILE, serializedState, FileConstants.SOURCE_DIR) }

0 commit comments

Comments
 (0)