Skip to content

Commit ce83224

Browse files
update raw payload docID to include shardID
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent f3684ff commit ce83224

File tree

4 files changed

+24
-17
lines changed

4 files changed

+24
-17
lines changed

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private DefaultStreamPoller(
114114
pollTimeout,
115115
pointerBasedLagUpdateIntervalMs,
116116
ingestionEngine.config().getIndexSettings(),
117-
IngestionMessageMapper.create(mapperType.getName())
117+
IngestionMessageMapper.create(mapperType.getName(), shardId)
118118
);
119119
}
120120

server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,16 @@ public static MapperType fromString(String value) {
6868
* Factory method to create a mapper instance based on type string.
6969
*
7070
* @param mapperTypeString the type of mapper to create as a string
71+
* @param shardId the shard ID
7172
* @return the mapper instance
7273
*/
73-
static IngestionMessageMapper create(String mapperTypeString) {
74+
static IngestionMessageMapper create(String mapperTypeString, int shardId) {
7475
MapperType mapperType = MapperType.fromString(mapperTypeString);
7576
switch (mapperType) {
7677
case DEFAULT:
7778
return new DefaultIngestionMessageMapper();
7879
case RAW_PAYLOAD:
79-
return new RawPayloadIngestionMessageMapper();
80+
return new RawPayloadIngestionMessageMapper(shardId);
8081
default:
8182
throw new IllegalArgumentException("Unknown mapper type: " + mapperType);
8283
}

server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,28 @@
2222

2323
/**
2424
* Mapper implementation that uses the entire message payload as the document source.
25-
* The message pointer (Kafka offset, Kinesis sequence number, etc) is used as the document ID, and the operation type is "index".
25+
* The message pointer (Kafka offset, Kinesis sequence number, etc) along with shardID is used as the document ID, and the operation type is "index".
26+
* For the document ID, the shard ID is prefixed with the shard pointer to ensure uniqueness across all shards.
2627
*
2728
* <p> Note that raw payload will not support document versioning. Eventually consistent view of the documents can be expected
2829
* if there is a shard recovery resulting in message replay.
2930
*/
3031
public class RawPayloadIngestionMessageMapper implements IngestionMessageMapper {
3132

3233
private static final String OP_TYPE_INDEX = "index";
34+
private final int shardId;
35+
36+
public RawPayloadIngestionMessageMapper(int shardId) {
37+
this.shardId = shardId;
38+
}
3339

3440
@Override
3541
public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException {
3642
// Parse the raw payload - this will be the _source content
3743
Map<String, Object> sourceMap = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
3844

39-
// Use pointer as the document ID
40-
String id = pointer.asString();
45+
// Use shard ID prefix + pointer as the document ID to ensure uniqueness across shards
46+
String id = shardId + "-" + pointer.asString();
4147

4248
// No auto-generated ID timestamp since we're using the pointer as ID
4349
long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;

server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ public void testMapperTypeFromStringInvalid() {
3636
}
3737

3838
public void testMapperCreation() {
39-
IngestionMessageMapper defaultMapper = IngestionMessageMapper.create("default");
39+
IngestionMessageMapper defaultMapper = IngestionMessageMapper.create("default", 0);
4040
assertNotNull(defaultMapper);
4141
assertTrue(defaultMapper instanceof DefaultIngestionMessageMapper);
4242

43-
IngestionMessageMapper rawPayloadMapper = IngestionMessageMapper.create("raw_payload");
43+
IngestionMessageMapper rawPayloadMapper = IngestionMessageMapper.create("raw_payload", 0);
4444
assertNotNull(rawPayloadMapper);
4545
assertTrue(rawPayloadMapper instanceof RawPayloadIngestionMessageMapper);
4646

47-
expectThrows(IllegalArgumentException.class, () -> IngestionMessageMapper.create("unknown"));
47+
expectThrows(IllegalArgumentException.class, () -> IngestionMessageMapper.create("unknown", 0));
4848
}
4949

5050
public void testDefaultMapperWithIdPresent() {
@@ -113,7 +113,7 @@ public void testDefaultMapperWithVersion() {
113113
}
114114

115115
public void testRawPayloadMapper() {
116-
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper();
116+
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(0);
117117
String payload = "{\"name\":\"alice\",\"age\":30,\"city\":\"Seattle\"}";
118118
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
119119

@@ -129,8 +129,8 @@ public void testRawPayloadMapper() {
129129

130130
Map<String, Object> parsedMap = result.parsedPayloadMap();
131131

132-
// Verify _id is set to pointer value
133-
assertEquals("100", parsedMap.get("_id"));
132+
// Verify _id is set to shard ID + pointer value
133+
assertEquals("0-100", parsedMap.get("_id"));
134134

135135
// Verify _op_type is set to "index"
136136
assertEquals("index", parsedMap.get("_op_type"));
@@ -151,7 +151,7 @@ public void testRawPayloadMapper() {
151151
}
152152

153153
public void testRawPayloadMapperWithComplexObject() {
154-
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper();
154+
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(1);
155155
String payload = "{\"user\":{\"name\":\"bob\",\"email\":\"[email protected]\"},\"tags\":[\"tag1\",\"tag2\"],\"count\":42}";
156156
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
157157

@@ -163,7 +163,7 @@ public void testRawPayloadMapperWithComplexObject() {
163163
assertNotNull(result);
164164
Map<String, Object> parsedMap = result.parsedPayloadMap();
165165

166-
assertEquals("200", parsedMap.get("_id"));
166+
assertEquals("1-200", parsedMap.get("_id"));
167167
assertEquals("index", parsedMap.get("_op_type"));
168168

169169
@SuppressWarnings("unchecked")
@@ -175,7 +175,7 @@ public void testRawPayloadMapperWithComplexObject() {
175175
}
176176

177177
public void testRawPayloadMapperWithEmptyObject() {
178-
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper();
178+
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(2);
179179
String payload = "{}";
180180
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
181181

@@ -187,7 +187,7 @@ public void testRawPayloadMapperWithEmptyObject() {
187187
assertNotNull(result);
188188
Map<String, Object> parsedMap = result.parsedPayloadMap();
189189

190-
assertEquals("300", parsedMap.get("_id"));
190+
assertEquals("2-300", parsedMap.get("_id"));
191191
assertEquals("index", parsedMap.get("_op_type"));
192192

193193
@SuppressWarnings("unchecked")
@@ -207,7 +207,7 @@ public void testDefaultMapperWithInvalidJson() {
207207
}
208208

209209
public void testRawPayloadMapperWithInvalidJson() {
210-
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper();
210+
RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(3);
211211
String payload = "not a json";
212212
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
213213

0 commit comments

Comments
 (0)