Skip to content

Commit b19689f

Browse files
authored
Merge pull request #8 from edsonwade/feature/Kafka-consumer-replaying-data
Add method to serialize log_params field and handle bulk requests
2 parents 717a6b5 + 8f911fc commit b19689f

File tree

6 files changed

+224
-26
lines changed

6 files changed

+224
-26
lines changed

.idea/inspectionProfiles/Project_Default.xml

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/consumer/Consumer-Strategy-Offset-Commit.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,26 @@ public static void main(String[] args) {
4848
```
4949

5050
Neste exemplo, o consumidor faz o commit dos offsets manualmente após processar cada lote de mensagens, garantindo que
51-
os offsets reflitam com precisão o ponto de processamento.
51+
os offsets reflitam com precisão o ponto de processamento.
52+
53+
## if we commit offsets before processing the data, we are in the scenario
54+
55+
- we are in the scenario at most once delivery semantics
56+
57+
## if we commit offsets after processing the data, we are in the scenario
58+
59+
- we are in the scenario at least once delivery semantics
60+
61+
## if we commit offsets exactly once, we are in the scenario
62+
63+
- we are in the scenario exactly once delivery semantics
64+
65+
### if we don't want to have duplicates in our target database
66+
we need to make sure the processing of the data is idempotent
67+
68+
### what's a generic unique id that I can use for messages I receive from a consumer?
69+
Topic + Partition + Offset
70+
71+
## if I want to replay data for a consumer, I should?
72+
- reset the offsets of the current consumer used group id to the beginning of the topic.
73+
- set the auto.offset.reset to earliest
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
O "consumer replay" de dados no Kafka permite que você reprocessar mensagens a partir de um determinado ponto no log.
2+
Isso pode ser útil para corrigir erros ou reprocessar dados com lógica atualizada.
3+
4+
Aqui está um exemplo básico de como configurar um consumidor Kafka para reprocessar dados a partir de um offset
5+
específico:
6+
7+
1. **Configurar o consumidor para ler a partir do início do log**:
8+
```java
9+
import org.apache.kafka.clients.consumer.ConsumerConfig;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.apache.kafka.clients.consumer.ConsumerRecords;
12+
import org.apache.kafka.clients.consumer.KafkaConsumer;
13+
14+
import java.time.Duration;
15+
import java.util.Collections;
16+
import java.util.Properties;
17+
@SuppressWarnings("all")
18+
public class KafkaConsumerReplay {
19+
20+
public static void main(String[] args) {
21+
Properties props = new Properties();
22+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
23+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
24+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
25+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
26+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Ler do início do log
27+
28+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
29+
consumer.subscribe(Collections.singletonList("your-topic"));
30+
31+
while (true) {
32+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
33+
for (ConsumerRecord<String, String> record : records) {
34+
System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
35+
// Processar a mensagem
36+
}
37+
}
38+
}
39+
}
40+
```
41+
42+
2. **Configurar o consumidor para lançar uma exceção se não houver mensagens**:
43+
```java
44+
45+
import org.apache.kafka.clients.consumer.ConsumerConfig;
46+
import org.apache.kafka.clients.consumer.KafkaConsumer;
47+
48+
import java.util.Properties;
49+
@SuppressWarnings("all")
50+
public class KafkaConsumerConfig {
51+
52+
public static Properties getConsumerProps(String offsetReset) {
53+
Properties props = new Properties();
54+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
55+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
56+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
57+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
58+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); // earliest, latest, ou none
59+
return props;
60+
}
61+
62+
public static void main(String[] args) {
63+
Properties noneProps = getConsumerProps("none");
64+
65+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(noneProps)) {
66+
// Use o consumidor conforme necessário
67+
} catch (Exception e) {
68+
e.printStackTrace();
69+
}
70+
}
71+
}
72+
```
73+
74+
Neste exemplo, o consumidor é configurado para ler a partir do início do log (`earliest`) ou lançar uma exceção se não
75+
houver mensagens (`none`). Isso permite reprocessar mensagens ou lidar com a ausência de mensagens conforme necessário.
76+
77+
## Configurando o Comportamento do Consumidor em Relação aos Offsets
78+
79+
Para configurar o comportamento do consumidor Kafka relativamente aos offsets, você pode usar a
80+
propriedade `auto.offset.reset`. Aqui está como configurar cada um dos comportamentos mencionados:
81+
82+
1. **`earliest`**: O consumidor lê do início do log.
83+
2. **`latest`**: O consumidor lê do fim do log.
84+
3. **`none`**: Lança uma exceção se não houver mensagens.
85+
86+
Aqui está um exemplo de como configurar essas propriedades no consumidor Kafka:
87+
88+
```java
89+
import org.apache.kafka.clients.consumer.ConsumerConfig;
90+
import org.apache.kafka.clients.consumer.KafkaConsumer;
91+
92+
import java.util.Properties;
93+
94+
public class KafkaConsumerConfig {
95+
96+
public static Properties getConsumerProps(String offsetReset) {
97+
Properties props = new Properties();
98+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
99+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
100+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
101+
"org.apache.kafka.common.serialization.StringDeserializer");
102+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
103+
"org.apache.kafka.common.serialization.StringDeserializer");
104+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); // earliest, latest, or none
105+
return props;
106+
}
107+
108+
public static void main(String[] args) {
109+
// Example usage
110+
Properties earliestProps = getConsumerProps("earliest");
111+
Properties latestProps = getConsumerProps("latest");
112+
Properties noneProps = getConsumerProps("none");
113+
114+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(earliestProps);
115+
// Use the consumer as needed
116+
}
117+
}
118+
```
119+
120+
Neste exemplo, a propriedade `auto.offset.reset` é configurada para `earliest`, `latest` ou `none`, dependendo do
121+
comportamento desejado.

kafka-basic/src/main/java/code/with/vanilson/consumidor/ConsumerDemoWithShutdown.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static void main(String[] args) {
5454
*
5555
* @param consumers - consumidor Kafka.
5656
*/
57-
static void pollMessages(KafkaConsumer<String, String> consumers) {
57+
public static void pollMessages(KafkaConsumer<String, String> consumers) {
5858
try {
5959
// Subscrever o consumidor Kafka ao tópico Kafka chamado "demo_java".
6060
consumers.subscribe(List.of("demo_java"));

kafka-consumer-opensearch/pom.xml

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@
3636
<version>${slf4j.version}</version>
3737
</dependency>
3838

39+
<dependency>
40+
<groupId>org.apache.logging.log4j</groupId>
41+
<artifactId>log4j-core</artifactId>
42+
<version>2.20.0</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.logging.log4j</groupId>
46+
<artifactId>log4j-api</artifactId>
47+
<version>2.20.0</version>
48+
</dependency>
49+
3950
<!--Okhttp-->
4051
<dependency>
4152
<groupId>com.squareup.okhttp3</groupId>
@@ -64,21 +75,12 @@
6475
<artifactId>gson</artifactId>
6576
<version>2.8.9</version>
6677
</dependency>
78+
<dependency>
79+
<groupId>code.with.vanilson</groupId>
80+
<artifactId>kafka-basic</artifactId>
81+
</dependency>
6782
</dependencies>
6883

69-
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
70-
<dependencyManagement>
71-
<dependencies>
72-
<dependency>
73-
<groupId>code.with.vanilson</groupId>
74-
<artifactId>kafka-basic</artifactId>
75-
<version>1.0-SNAPSHOT</version>
76-
<scope>compile</scope>
77-
</dependency>
78-
</dependencies>
79-
80-
</dependencyManagement>
81-
8284

8385
<build>
8486
<plugins>

kafka-consumer-opensearch/src/main/java/code/with/vanilson/OpenSearchConsumer.java

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package code.with.vanilson;
22

3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonElement;
5+
import com.google.gson.JsonObject;
36
import com.google.gson.JsonParser;
47
import org.apache.kafka.clients.consumer.ConsumerRecord;
58
import org.apache.kafka.clients.consumer.ConsumerRecords;
69
import org.apache.kafka.clients.consumer.KafkaConsumer;
10+
import org.apache.kafka.common.errors.WakeupException;
711
import org.opensearch.OpenSearchStatusException;
812
import org.opensearch.action.bulk.BulkRequest;
913
import org.opensearch.action.bulk.BulkResponse;
@@ -38,7 +42,21 @@ public static void main(String[] args) {
3842
try (RestHighLevelClient openSearchClient = OpenSearchClientConsumer.createOpenSearchClient();
3943
KafkaConsumer<String, String> consumer = KafkaConsumerClient.createKafkaConsumer()) {
4044

41-
// Check if the index exists, create it if it doesn't
45+
final Thread mainThread = Thread.currentThread();
46+
47+
// Add shutdown hook
48+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
49+
log.info("Detected a shutdown consumer, let's exit gracefully.");
50+
consumer.wakeup();
51+
try {
52+
mainThread.join(); // Wait until the main consumer loop completes
53+
} catch (InterruptedException e) {
54+
log.error("Error while shutdown consumer", e);
55+
Thread.currentThread().interrupt();
56+
}
57+
}));
58+
59+
// Check and create OpenSearch index if necessary
4260
if (!openSearchClient.indices().exists(new GetIndexRequest(WIKIMEDIA_INDEX), RequestOptions.DEFAULT)) {
4361
CreateIndexRequest createIndexRequest = new CreateIndexRequest(WIKIMEDIA_INDEX);
4462
openSearchClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
@@ -47,10 +65,10 @@ public static void main(String[] args) {
4765
log.info("Index already exists: {}", WIKIMEDIA_INDEX);
4866
}
4967

50-
// Subscribe to the topic
68+
// Subscribe to Kafka topic
5169
consumer.subscribe(Collections.singleton(TOPIC));
5270

53-
// Poll for new data
71+
// Poll loop
5472
while (true) {
5573
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
5674
int recordCount = records.count();
@@ -81,43 +99,71 @@ public static void main(String[] args) {
8199
});
82100
}
83101

84-
// Sleep for 1000 milliseconds
85102
try {
86-
Thread.sleep(1000);
103+
Thread.sleep(1000); // Control thread sleep interval
87104
} catch (InterruptedException e) {
88105
log.error("Thread sleep interrupted: {}", e.getMessage());
89106
Thread.currentThread().interrupt();
90107
}
91108
}
92109
} catch (IOException e) {
93110
log.error("An error occurred: {}", e.getMessage());
111+
} catch (WakeupException e) {
112+
log.info("Consumer is starting to shutdown");
113+
} catch (Exception e) {
114+
log.error("Unexpected exception to the consumer", e);
94115
}
116+
log.info("Consumer has been closed");
95117
}
96118

97119
/**
98-
* Adds a record to the bulk request.
120+
* Adds a record to the bulk request after normalizing the log_params field.
99121
*
100-
* @param record the consumer record
101-
* @param bulkRequest the bulk request
122+
* @param record the consumer record containing the original JSON string
123+
* @param bulkRequest the bulk request to which the index request will be added
102124
*/
103125
private static void addRecordToBulkRequest(ConsumerRecord<String, String> record, BulkRequest bulkRequest) {
104126
try {
105-
String id = extractIdFromJsonValue(record.value());
106-
IndexRequest indexRequest = new IndexRequest(WIKIMEDIA_INDEX)
107-
.source(record.value(), XContentType.JSON)
127+
String originalString = record.value();
128+
String id = extractIdFromJsonValue(originalString);
129+
String normalizedString = serializeLogParams(originalString);
130+
131+
IndexRequest indexRequest = new IndexRequest("wikimedia")
132+
.source(normalizedString, XContentType.JSON)
108133
.id(id);
109134
bulkRequest.add(indexRequest);
110135
} catch (OpenSearchStatusException e) {
111136
log.error("An error occurred while adding record to bulk request: {}", e.getMessage());
112137
}
113138
}
114139

140+
/**
141+
* Serializes the log_params field in the original JSON string.
142+
*
143+
* @param originalString the original JSON string
144+
* @return the modified JSON string with the log_params field serialized
145+
*/
146+
private static String serializeLogParams(String originalString) {
147+
JsonObject jsonObject = JsonParser.parseString(originalString).getAsJsonObject();
148+
149+
if (jsonObject.has("log_params")) {
150+
// Get the log_params as an object (no serialization into a string)
151+
JsonElement logParams = jsonObject.get("log_params");
152+
153+
// Add log_params back as an object
154+
jsonObject.add("log_params", logParams);
155+
}
156+
157+
return jsonObject.toString(); // Return the modified JSON string
158+
}
159+
115160
/**
116161
* Extracts the ID from the JSON value.
117162
*
118163
* @param json the JSON string
119164
* @return the extracted ID
120165
*/
166+
121167
private static String extractIdFromJsonValue(String json) {
122168
return JsonParser.parseString(json)
123169
.getAsJsonObject()

0 commit comments

Comments
 (0)