Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/consumer/Consumer-Idempontence.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ O código `OpenSearchConsumer.java` é um consumidor Kafka que lê mensagens de

### Alteração Feita

Para tornar o consumidor idempotente, foi implementada uma verificação de idempotência. Isso é feito extraindo um ID único de cada mensagem Kafka e usando esse ID ao indexar a mensagem no OpenSearch. Se uma mensagem com o mesmo ID já foi indexada, ela não será indexada novamente.
Para tornar o consumidor idempotente, foi implementada uma verificação de idempotência. Isso é feito extraindo um ID único de cada mensagem, Kafka e usando esse ID ao indexar a mensagem no OpenSearch. Se uma mensagem com o mesmo ID já foi indexada, ela não será indexada novamente.

### README em Português
### README em português

```markdown
# Consumidor OpenSearch Idempotente
Expand Down
51 changes: 51 additions & 0 deletions docs/consumer/Consumer-Strategy-Offset-Commit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
### Estratégia de Commit de Offset do Consumidor

No Kafka, o commit de offset é uma parte crucial do processo de consumo de mensagens. Ele garante que o consumidor saiba
até onde ele leu no tópico Kafka, permitindo que ele retome a leitura a partir do ponto correto em caso de falhas ou
reinicializações. Existem duas principais estratégias de commit de offset:

1. **Commit Automático (Auto-Commit)**:
- O Kafka pode ser configurado para fazer o commit automático dos offsets em intervalos regulares.
- Isso é controlado pela propriedade `enable.auto.commit` no consumidor Kafka.
- Quando `enable.auto.commit` está definido como `true`, o consumidor faz o commit dos offsets automaticamente em
intervalos definidos pela propriedade `auto.commit.interval.ms`.
- Vantagem: Simplicidade, pois o Kafka gerencia os commits automaticamente.
- Desvantagem: Pode levar a duplicação de mensagens em caso de falhas, pois os commits podem não refletir exatamente
o ponto de processamento do consumidor.

2. **Commit Manual**:
- O consumidor pode ser configurado para fazer o commit dos offsets manualmente.
- Isso é feito definindo `enable.auto.commit` como `false` e chamando explicitamente o método `commitSync()`
ou `commitAsync()` do consumidor.
- `commitSync()`: Faz o commit de forma síncrona, garantindo que o commit foi bem-sucedido antes de prosseguir.
- `commitAsync()`: Faz o commit de forma assíncrona, permitindo que o consumidor continue a processar mensagens
enquanto o commit é realizado em segundo plano.
- Vantagem: Maior controle sobre quando os offsets são comitados, reduzindo a hipótese de duplicação de mensagens.
- Desvantagem: Requer mais código e gerenciamento por parte do desenvolvedor.

### Exemplo de Commit Manual

```java
public static void main(String[] args) {

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("wikimedia-recentchange"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Processa a mensagem
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// Commit manual dos offsets
consumer.commitSync();
}
} finally {
consumer.close();
}
}
```

Neste exemplo, o consumidor faz o commit dos offsets manualmente após processar cada lote de mensagens, garantindo que
os offsets reflitam com precisão o ponto de processamento.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static Properties getProperties() {
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "consumer-opensearch-demo");
props.setProperty("auto.offset.reset", "latest");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,24 @@ public static void main(String[] args) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
int recordCount = records.count();
log.info("Received {} {}", recordCount, " records");

if (recordCount == 0) {
continue;
}
// loop through the records and send them to opensearch

for (var record : records) {
// send the data to opensearch
indexRecord(record, openSearchClient);

}
// commit the offsets
consumer.commitSync();
log.info("Offsets have been committed");

}

} catch (IOException e) {
log.error("An error occurred:{} ", e.getMessage());

}

}

/**
Expand Down