From a05e78baaeb9c66b5623ec3d6994457e20c2e9ea Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Sat, 25 Oct 2025 02:18:27 +0530 Subject: [PATCH 01/23] MLH-1477 dummy commit --- LOCAL_SETUP.md | 1 + 1 file changed, 1 insertion(+) diff --git a/LOCAL_SETUP.md b/LOCAL_SETUP.md index 082866f8cce..f65fb5a8c3a 100644 --- a/LOCAL_SETUP.md +++ b/LOCAL_SETUP.md @@ -148,3 +148,4 @@ For more detailed information, refer to: - The build command skips tests and various checks for faster development builds - For production builds, remove the skip flags - Keep your GitHub PAT token secure and never commit it to version control +- Adjust memory and CPU settings in Colima based on your machine's capabilities \ No newline at end of file From 4bdffa2efc67b74987d16aab6808174f7fb1c4b6 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Sat, 25 Oct 2025 02:19:41 +0530 Subject: [PATCH 02/23] MLH-1477 add branch to build --- .github/workflows/maven.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 6be4e7f94c6..b2e9e6737de 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - master - staging - mlh-1477-staging + - mlh-1477-staging-dlq jobs: build: From 4c5ba905eaaaa60e4fd2254e9e06aa8a431c9090 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Sat, 25 Oct 2025 08:59:47 +0530 Subject: [PATCH 03/23] MLH-1477 update to trigger a build --- LOCAL_SETUP.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/LOCAL_SETUP.md b/LOCAL_SETUP.md index f65fb5a8c3a..07793a28fb6 100644 --- a/LOCAL_SETUP.md +++ b/LOCAL_SETUP.md @@ -9,6 +9,8 @@ This guide will help you set up Atlas for local development. - Maven 3.8+ - Docker (via Colima for macOS) - Git +- Get the source code from the AtlanHQ repository (An override of Apache Atlas) +- Download the zip and configuration artifacts from https://atlanhq.atlassian.net/wiki/spaces/c873aeb606dd4834a95d9909a757bfa6/pages/800424446/How+to+run+Atlas+on+the+local+machine ### Java Setup 1. Install Java 17: From 18f37b83c4eecdcb63eacf659f24e632d67fa0e0 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Sat, 25 Oct 2025 20:57:24 +0530 Subject: [PATCH 04/23] MLH-1477 dummy commit --- LOCAL_SETUP.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/LOCAL_SETUP.md b/LOCAL_SETUP.md index 07793a28fb6..0f30a2800b0 100644 --- a/LOCAL_SETUP.md +++ b/LOCAL_SETUP.md @@ -150,4 +150,5 @@ For more detailed information, refer to: - The build command skips tests and various checks for faster development builds - For production builds, remove the skip flags - Keep your GitHub PAT token secure and never commit it to version control -- Adjust memory and CPU settings in Colima based on your machine's capabilities \ No newline at end of file +- Adjust memory and CPU settings in Colima based on your machine's capabilities +- Dummy commit \ No newline at end of file From ca3cd32479f124f65a7c0fd327fee0acb113b9ee Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 00:47:32 +0530 Subject: [PATCH 05/23] MLH-1477 add dlq replay service and api endpoint to monitor status --- .../atlas/web/rest/DLQAdminController.java | 28 ++ .../atlas/web/service/DLQReplayService.java | 418 ++++++++++++++++++ 2 files changed, 446 insertions(+) create mode 100644 webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java create mode 100644 webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java new file mode 100644 index 00000000000..ae1cd1c4f05 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java @@ -0,0 +1,28 @@ +package org.apache.atlas.web.rest; + +import org.apache.atlas.web.service.DLQReplayService; +import org.apache.atlas.web.util.Servlets; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.inject.Singleton; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.util.Map; + +@Path("dlq") +@Singleton +@Service +@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +public class DLQAdminController { + + @Autowired + private DLQReplayService dlqReplayService; + + @GET + @Path("/replay/status") + public Map getStatus() { + return dlqReplayService.getStatus(); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java new file mode 100644 index 00000000000..0f5dd865273 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -0,0 +1,418 @@ +package org.apache.atlas.web.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.BaseTransaction; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.dlq.DLQEntry; +import org.janusgraph.diskstorage.dlq.SerializableIndexMutation; +import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig; +import org.janusgraph.diskstorage.indexing.IndexEntry; +import org.janusgraph.diskstorage.indexing.IndexMutation; +import org.janusgraph.diskstorage.indexing.KeyInformation; +import org.janusgraph.diskstorage.es.ElasticSearchIndex; +import org.janusgraph.core.Cardinality; +import org.janusgraph.core.schema.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import io.prometheus.client.Gauge; +import io.prometheus.client.Counter; +import javax.annotation.PostConstruct; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Service for replaying DLQ messages back to Elasticsearch. + * Uses Kafka consumer groups to ensure only one instance processes messages. + */ +@Service +public class DLQReplayService { + + private static final Logger log = LoggerFactory.getLogger(DLQReplayService.class); + + private static final Gauge DLQ_MESSAGES_REMAINING = Gauge.build() + .name("atlas_dlq_messages_remaining") + .help("Number of messages remaining to be processed in the DLQ topic") + .register(); + + private static final Counter DLQ_MESSAGES_PROCESSED = Counter.build() + .name("atlas_dlq_messages_processed_total") + .help("Total number of DLQ messages successfully processed") + .register(); + + private static final Counter DLQ_PROCESSING_ERRORS = Counter.build() + .name("atlas_dlq_processing_errors_total") + .help("Total number of errors encountered while processing DLQ messages") + .register(); + + @Value("${atlas.kafka.bootstrap.servers:localhost:9092}") + private final String bootstrapServers = "localhost:9092"; + @Value("${atlas.kafka.dlq.topic:ATLAS_ES_DLQ}") + private final String dlqTopic="ATLAS_ES_DLQ"; + @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") + private final String consumerGroupId= "atlas_dq_replay_group"; + + @Value("${atlas.dlq.poll.timeout.seconds:60}") + private int pollTimeoutSeconds; + + @Value("${atlas.dlq.poll.timeout.max.seconds:120}") + private int maxPollTimeoutSeconds; + + @Value("${atlas.dlq.backoff.initial.seconds:1}") + private int initialBackoffSeconds; + + @Value("${atlas.dlq.sleep.caught.up.multiplier:1000}") + private int caughtUpSleepMultiplier; + + @Value("${atlas.dlq.sleep.checking.multiplier:100}") + private int checkingSleepMultiplier; + + @Value("${atlas.dlq.batch.size:10}") + private int batchSize; + private ElasticSearchIndex esIndex; + private final ObjectMapper mapper; + + private volatile Thread processingThread; + private KafkaConsumer consumer; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + + public DLQReplayService() throws BackendException { + this.esIndex = new ElasticSearchIndex(Configuration.EMPTY); + this.mapper = new ObjectMapper(); + } + + /** + * Get replay status + */ + @PostConstruct + public void startDLQProcessing() { + // Start processing in a separate thread + processingThread = new Thread(this::processDLQMessages, "DLQ-Processing-Thread"); + processingThread.setDaemon(true); + processingThread.start(); + log.info("Started DLQ processing thread"); + } + + public Map getStatus() { + Map status = new HashMap<>(); + status.put("topic", dlqTopic); + status.put("consumerGroup", consumerGroupId); + status.put("isProcessing", processingThread != null && processingThread.isAlive()); + return status; + } + + /** + * Replay a single DLQ entry + */ + private void replayDLQEntry(String dlqJson) throws Exception { + DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); + + log.debug("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); + + // Reconstruct mutations from serialized form + Map> mutations = reconstructMutations(entry); + + // Create key information retriever + KeyInformation.IndexRetriever keyInfo = createKeyInfoRetriever(entry); + + // Create a new transaction for replay + BaseTransaction replayTx = esIndex.beginTransaction( + new StandardBaseTransactionConfig.Builder().build() + ); + + try { + // This is the same method that originally failed - now we're replaying it! + esIndex.mutate(mutations, keyInfo, replayTx); + replayTx.commit(); + + log.debug("Successfully replayed mutation for index: {}", entry.getIndexName()); + + } catch (Exception e) { + replayTx.rollback(); + throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); + } + } + + /** + * Reconstruct IndexMutation objects from serialized form + */ + private Map> reconstructMutations(DLQEntry entry) { + Map> result = new HashMap<>(); + + for (Map.Entry> storeEntry : + entry.getMutations().entrySet()) { + + String storeName = storeEntry.getKey(); + Map storeMutations = new HashMap<>(); + + for (Map.Entry docEntry : + storeEntry.getValue().entrySet()) { + + String docId = docEntry.getKey(); + SerializableIndexMutation serMut = docEntry.getValue(); + + // Reconstruct IndexMutation + IndexMutation mutation = new IndexMutation( + createStoreRetriever(storeName), // This is simplified - you may need more context + serMut.isNew(), + serMut.isDeleted() + ); + + // Add additions + for (SerializableIndexMutation.SerializableIndexEntry add : serMut.getAdditions()) { + mutation.addition(new IndexEntry(add.getField(), add.getValue())); + } + + // Add deletions + for (SerializableIndexMutation.SerializableIndexEntry del : serMut.getDeletions()) { + mutation.deletion(new IndexEntry(del.getField(), del.getValue())); + } + + storeMutations.put(docId, mutation); + } + + result.put(storeName, storeMutations); + } + + return result; + } + + /** + * Create key information retriever for replay + */ + private KeyInformation.IndexRetriever createKeyInfoRetriever(DLQEntry entry) { + return new KeyInformation.IndexRetriever() { + @Override + public KeyInformation.StoreRetriever get(String store) { + return new KeyInformation.StoreRetriever() { + @Override + public KeyInformation get(String key) { + // This is a simplified implementation + // In practice, you might need to store more schema information in the DLQ + return createKeyInformation(store); + } + }; + } + + @Override + public KeyInformation get(String store, String key) { + return createKeyInformation(store); + } + + @Override + public void invalidate(String store) { + // No-op for replay + } + }; + } + + /** + * Create a basic KeyInformation object + */ + private KeyInformation createKeyInformation(String store) { + // This is a simplified implementation + // You might need to enhance this based on your schema requirements + return new KeyInformation() { + @Override + public Class getDataType() { + return String.class; + } + + @Override + public Parameter[] getParameters() { + return new Parameter[0]; + } + + @Override + public Cardinality getCardinality() { + return Cardinality.SINGLE; + } + }; + } + + /** + * Create a store retriever for IndexMutation + */ + private KeyInformation.StoreRetriever createStoreRetriever(String store) { + return new KeyInformation.StoreRetriever() { + @Override + public KeyInformation get(String key) { + return createKeyInformation(store); + } + }; + } + + private void processDLQMessages() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); // Process in small batches + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(maxPollTimeoutSeconds * 1000)); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollTimeoutSeconds * 2500)); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + log.debug("DLQ partitions revoked from this consumer: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.debug("DLQ partitions assigned to this consumer: {}", partitions); + if (partitions.isEmpty()) { + log.debug("No partitions assigned to this consumer - another pod is handling the DLQ processing"); + } else { + // Update remaining messages count when we get partition assignment + try { + Map endOffsets = consumer.endOffsets(partitions); + Map committed = new HashMap<>(); + for (TopicPartition partition : partitions) { + OffsetAndMetadata offsetMeta = consumer.committed(Collections.singleton(partition)).get(partition); + committed.put(partition, offsetMeta != null ? offsetMeta.offset() : 0L); + } + + long remaining = 0; + for (TopicPartition partition : partitions) { + remaining += endOffsets.get(partition) - committed.get(partition); + } + DLQ_MESSAGES_REMAINING.set(remaining); + } catch (Exception e) { + log.error("Failed to calculate remaining messages", e); + DLQ_MESSAGES_REMAINING.set(-1); + } + } + } + }); + + log.info("Subscribed to DLQ topic: {} with consumer group: {}", dlqTopic, consumerGroupId); + + Duration pollTimeout = Duration.ofSeconds(pollTimeoutSeconds); + Duration backoffInterval = Duration.ofSeconds(initialBackoffSeconds); + int emptyPollCount = 0; + boolean caughtUp = false; + Map pendingOffsets = new HashMap<>(); + + while (!Thread.currentThread().isInterrupted()) { + try { + ConsumerRecords records = consumer.poll(pollTimeout); + + if (records.isEmpty()) { + emptyPollCount++; + if (!caughtUp) { + // Check if we've caught up with the latest offset + Map endOffsets = consumer.endOffsets(consumer.assignment()); + Map currentOffsets = new HashMap<>(); + for (TopicPartition partition : consumer.assignment()) { + currentOffsets.put(partition, consumer.position(partition)); + } + + caughtUp = endOffsets.entrySet().stream() + .allMatch(e -> e.getValue() <= currentOffsets.get(e.getKey())); + + if (caughtUp) { + log.debug("Caught up with latest offset, entering low-power mode"); + pollTimeout = Duration.ofSeconds(maxPollTimeoutSeconds); + + // Pause partitions to reduce CPU usage + consumer.pause(consumer.assignment()); + } + } + + // Exponential backoff with proper sleep when no messages + if (emptyPollCount > 1) { + // Calculate backoff time (exponential up to max) + long backoffSeconds = Math.min(backoffInterval.getSeconds() * 2, maxPollTimeoutSeconds); + backoffInterval = Duration.ofSeconds(backoffSeconds); + + if (caughtUp) { + // In low-power mode, sleep for longer periods + Thread.sleep(backoffSeconds * caughtUpSleepMultiplier); // Longer sleep when caught up + Thread.yield(); // Explicitly yield CPU + } else { + // When not caught up, use shorter sleep + Thread.sleep(Math.min(1000, backoffSeconds * checkingSleepMultiplier)); + } + } + continue; + } + + // Reset backoff when we get messages + emptyPollCount = 0; + backoffInterval = Duration.ofSeconds(initialBackoffSeconds); + + // Resume partitions if they were paused + if (caughtUp) { + caughtUp = false; + pollTimeout = Duration.ofSeconds(pollTimeoutSeconds); + consumer.resume(consumer.assignment()); + } + + log.info("Processing {} DLQ messages", records.count()); + + for (ConsumerRecord record : records) { + try { + replayDLQEntry(record.value()); + + // Store offset for batch commit + TopicPartition partition = new TopicPartition(record.topic(), record.partition()); + pendingOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1)); + + // Update metrics immediately for accurate tracking + DLQ_MESSAGES_PROCESSED.inc(); + DLQ_MESSAGES_REMAINING.dec(); + + // Commit if we've reached batch size or it's the last record in the poll + boolean isLastRecord = record.equals(records.records(partition).get(records.records(partition).size() - 1)); + if (pendingOffsets.size() >= batchSize || isLastRecord) { + consumer.commitSync(pendingOffsets); + log.debug("Committed batch of {} offsets", pendingOffsets.size()); + pendingOffsets.clear(); + } + + log.debug("Successfully replayed DLQ entry (offset: {})", record.offset()); + + } catch (Exception e) { + DLQ_PROCESSING_ERRORS.inc(); + log.error("Failed to replay DLQ entry (offset: {})", record.offset(), e); + + // On error, commit any successful offsets before retrying + if (!pendingOffsets.isEmpty()) { + try { + consumer.commitSync(pendingOffsets); + log.debug("Committed {} successful offsets before error handling", pendingOffsets.size()); + pendingOffsets.clear(); + } catch (Exception commitEx) { + log.error("Failed to commit offsets after error", commitEx); + } + } + + Thread.sleep(pollTimeoutSeconds * 1000); // Wait before retrying + } + } + } catch (Exception e) { + log.error("Error in DLQ processing", e); + Thread.sleep(pollTimeoutSeconds * 1000); // Wait before retrying + } + } + } catch (Exception e) { + log.error("Fatal error in DLQ processing thread", e); + } + } +} From 69e6c5d5118b613ad67575733ff83dfd8cf43737 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 11:44:36 +0530 Subject: [PATCH 06/23] MLH-1477 dummy commit --- LOCAL_SETUP.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/LOCAL_SETUP.md b/LOCAL_SETUP.md index 0f30a2800b0..07793a28fb6 100644 --- a/LOCAL_SETUP.md +++ b/LOCAL_SETUP.md @@ -150,5 +150,4 @@ For more detailed information, refer to: - The build command skips tests and various checks for faster development builds - For production builds, remove the skip flags - Keep your GitHub PAT token secure and never commit it to version control -- Adjust memory and CPU settings in Colima based on your machine's capabilities -- Dummy commit \ No newline at end of file +- Adjust memory and CPU settings in Colima based on your machine's capabilities \ No newline at end of file From eb01bd6c9794091d6d6180826b264cb2c2f6fb82 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 12:37:29 +0530 Subject: [PATCH 07/23] MLH-1477 wire in only elastic configuration --- .../atlas/web/service/DLQReplayService.java | 347 +++++++----------- 1 file changed, 132 insertions(+), 215 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 0f5dd865273..86615451fd5 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -1,13 +1,11 @@ package org.apache.atlas.web.service; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransaction; @@ -21,17 +19,20 @@ import org.janusgraph.diskstorage.es.ElasticSearchIndex; import org.janusgraph.core.Cardinality; import org.janusgraph.core.schema.Parameter; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.database.StandardJanusGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import io.prometheus.client.Gauge; -import io.prometheus.client.Counter; -import javax.annotation.PostConstruct; import java.time.Duration; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * Service for replaying DLQ messages back to Elasticsearch. @@ -42,75 +43,131 @@ public class DLQReplayService { private static final Logger log = LoggerFactory.getLogger(DLQReplayService.class); - private static final Gauge DLQ_MESSAGES_REMAINING = Gauge.build() - .name("atlas_dlq_messages_remaining") - .help("Number of messages remaining to be processed in the DLQ topic") - .register(); - - private static final Counter DLQ_MESSAGES_PROCESSED = Counter.build() - .name("atlas_dlq_messages_processed_total") - .help("Total number of DLQ messages successfully processed") - .register(); - - private static final Counter DLQ_PROCESSING_ERRORS = Counter.build() - .name("atlas_dlq_processing_errors_total") - .help("Total number of errors encountered while processing DLQ messages") - .register(); - @Value("${atlas.kafka.bootstrap.servers:localhost:9092}") private final String bootstrapServers = "localhost:9092"; @Value("${atlas.kafka.dlq.topic:ATLAS_ES_DLQ}") private final String dlqTopic="ATLAS_ES_DLQ"; @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") private final String consumerGroupId= "atlas_dq_replay_group"; + private ElasticSearchIndex esIndex; + private final ObjectMapper mapper; - @Value("${atlas.dlq.poll.timeout.seconds:60}") - private int pollTimeoutSeconds; + private volatile KafkaConsumer consumer; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicInteger processedCount = new AtomicInteger(0); + private final AtomicInteger errorCount = new AtomicInteger(0); + + public DLQReplayService(AtlasJanusGraph graph) throws BackendException { + // Extract ES configuration from existing graph + GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); + Configuration fullConfig = graphConfig.getConfiguration(); + + // Restrict to the index namespace + Configuration indexConfig = fullConfig.restrictTo("elasticsearch"); + this.esIndex = new ElasticSearchIndex(indexConfig); + this.mapper = new ObjectMapper(); + } - @Value("${atlas.dlq.poll.timeout.max.seconds:120}") - private int maxPollTimeoutSeconds; + /** + * Start replaying DLQ messages + */ + public synchronized void startReplay() { + if (isRunning.get()) { + log.warn("DLQ replay is already running"); + return; + } - @Value("${atlas.dlq.backoff.initial.seconds:1}") - private int initialBackoffSeconds; + log.info("Starting DLQ replay service for topic: {} with consumer group: {}", dlqTopic, consumerGroupId); - @Value("${atlas.dlq.sleep.caught.up.multiplier:1000}") - private int caughtUpSleepMultiplier; + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Process in small batches - @Value("${atlas.dlq.sleep.checking.multiplier:100}") - private int checkingSleepMultiplier; + consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Collections.singletonList(dlqTopic)); - @Value("${atlas.dlq.batch.size:10}") - private int batchSize; - private ElasticSearchIndex esIndex; - private final ObjectMapper mapper; + isRunning.set(true); - private volatile Thread processingThread; - private KafkaConsumer consumer; - private final AtomicBoolean isRunning = new AtomicBoolean(false); + // Start processing in a separate thread + Thread replayThread = new Thread(this::processMessages, "DLQ-Replay-Thread"); + replayThread.setDaemon(true); + replayThread.start(); - public DLQReplayService() throws BackendException { - this.esIndex = new ElasticSearchIndex(Configuration.EMPTY); - this.mapper = new ObjectMapper(); + log.info("DLQ replay service started successfully"); } /** - * Get replay status + * Stop replaying DLQ messages */ - @PostConstruct - public void startDLQProcessing() { - // Start processing in a separate thread - processingThread = new Thread(this::processDLQMessages, "DLQ-Processing-Thread"); - processingThread.setDaemon(true); - processingThread.start(); - log.info("Started DLQ processing thread"); + public synchronized void stopReplay() { + if (!isRunning.get()) { + log.warn("DLQ replay is not running"); + return; + } + + log.info("Stopping DLQ replay service..."); + isRunning.set(false); + + if (consumer != null) { + consumer.close(); + consumer = null; + } + + log.info("DLQ replay service stopped. Processed: {}, Errors: {}", + processedCount.get(), errorCount.get()); } - public Map getStatus() { - Map status = new HashMap<>(); - status.put("topic", dlqTopic); - status.put("consumerGroup", consumerGroupId); - status.put("isProcessing", processingThread != null && processingThread.isAlive()); - return status; + /** + * Process messages from the DLQ topic + */ + private void processMessages() { + log.info("DLQ replay thread started, polling for messages..."); + + while (isRunning.get()) { + try { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + + if (records.isEmpty()) { + continue; // No messages, continue polling + } + + log.info("Received {} DLQ messages to replay", records.count()); + + for (ConsumerRecord record : records) { + try { + replayDLQEntry(record.value()); + consumer.commitSync(); // Commit only after successful replay + processedCount.incrementAndGet(); + + log.info("Successfully replayed DLQ entry (offset: {})", record.offset()); + + } catch (Exception e) { + errorCount.incrementAndGet(); + log.error("Failed to replay DLQ entry (offset: {}), will retry later", + record.offset(), e); + + // Don't commit - message will be reprocessed + break; // Stop processing this batch to retry later + } + } + + } catch (Exception e) { + log.error("Error in DLQ replay processing", e); + try { + Thread.sleep(5000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + + log.info("DLQ replay thread finished"); } /** @@ -254,165 +311,25 @@ public KeyInformation get(String key) { }; } - private void processDLQMessages() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); // Process in small batches - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(maxPollTimeoutSeconds * 1000)); - props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollTimeoutSeconds * 2500)); - - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection partitions) { - log.debug("DLQ partitions revoked from this consumer: {}", partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - log.debug("DLQ partitions assigned to this consumer: {}", partitions); - if (partitions.isEmpty()) { - log.debug("No partitions assigned to this consumer - another pod is handling the DLQ processing"); - } else { - // Update remaining messages count when we get partition assignment - try { - Map endOffsets = consumer.endOffsets(partitions); - Map committed = new HashMap<>(); - for (TopicPartition partition : partitions) { - OffsetAndMetadata offsetMeta = consumer.committed(Collections.singleton(partition)).get(partition); - committed.put(partition, offsetMeta != null ? offsetMeta.offset() : 0L); - } - - long remaining = 0; - for (TopicPartition partition : partitions) { - remaining += endOffsets.get(partition) - committed.get(partition); - } - DLQ_MESSAGES_REMAINING.set(remaining); - } catch (Exception e) { - log.error("Failed to calculate remaining messages", e); - DLQ_MESSAGES_REMAINING.set(-1); - } - } - } - }); - - log.info("Subscribed to DLQ topic: {} with consumer group: {}", dlqTopic, consumerGroupId); - - Duration pollTimeout = Duration.ofSeconds(pollTimeoutSeconds); - Duration backoffInterval = Duration.ofSeconds(initialBackoffSeconds); - int emptyPollCount = 0; - boolean caughtUp = false; - Map pendingOffsets = new HashMap<>(); + /** + * Get replay status + */ + public Map getStatus() { + Map status = new HashMap<>(); + status.put("isRunning", isRunning.get()); + status.put("processedCount", processedCount.get()); + status.put("errorCount", errorCount.get()); + status.put("topic", dlqTopic); + status.put("consumerGroup", consumerGroupId); + return status; + } - while (!Thread.currentThread().isInterrupted()) { - try { - ConsumerRecords records = consumer.poll(pollTimeout); - - if (records.isEmpty()) { - emptyPollCount++; - if (!caughtUp) { - // Check if we've caught up with the latest offset - Map endOffsets = consumer.endOffsets(consumer.assignment()); - Map currentOffsets = new HashMap<>(); - for (TopicPartition partition : consumer.assignment()) { - currentOffsets.put(partition, consumer.position(partition)); - } - - caughtUp = endOffsets.entrySet().stream() - .allMatch(e -> e.getValue() <= currentOffsets.get(e.getKey())); - - if (caughtUp) { - log.debug("Caught up with latest offset, entering low-power mode"); - pollTimeout = Duration.ofSeconds(maxPollTimeoutSeconds); - - // Pause partitions to reduce CPU usage - consumer.pause(consumer.assignment()); - } - } - - // Exponential backoff with proper sleep when no messages - if (emptyPollCount > 1) { - // Calculate backoff time (exponential up to max) - long backoffSeconds = Math.min(backoffInterval.getSeconds() * 2, maxPollTimeoutSeconds); - backoffInterval = Duration.ofSeconds(backoffSeconds); - - if (caughtUp) { - // In low-power mode, sleep for longer periods - Thread.sleep(backoffSeconds * caughtUpSleepMultiplier); // Longer sleep when caught up - Thread.yield(); // Explicitly yield CPU - } else { - // When not caught up, use shorter sleep - Thread.sleep(Math.min(1000, backoffSeconds * checkingSleepMultiplier)); - } - } - continue; - } - - // Reset backoff when we get messages - emptyPollCount = 0; - backoffInterval = Duration.ofSeconds(initialBackoffSeconds); - - // Resume partitions if they were paused - if (caughtUp) { - caughtUp = false; - pollTimeout = Duration.ofSeconds(pollTimeoutSeconds); - consumer.resume(consumer.assignment()); - } - - log.info("Processing {} DLQ messages", records.count()); - - for (ConsumerRecord record : records) { - try { - replayDLQEntry(record.value()); - - // Store offset for batch commit - TopicPartition partition = new TopicPartition(record.topic(), record.partition()); - pendingOffsets.put(partition, new OffsetAndMetadata(record.offset() + 1)); - - // Update metrics immediately for accurate tracking - DLQ_MESSAGES_PROCESSED.inc(); - DLQ_MESSAGES_REMAINING.dec(); - - // Commit if we've reached batch size or it's the last record in the poll - boolean isLastRecord = record.equals(records.records(partition).get(records.records(partition).size() - 1)); - if (pendingOffsets.size() >= batchSize || isLastRecord) { - consumer.commitSync(pendingOffsets); - log.debug("Committed batch of {} offsets", pendingOffsets.size()); - pendingOffsets.clear(); - } - - log.debug("Successfully replayed DLQ entry (offset: {})", record.offset()); - - } catch (Exception e) { - DLQ_PROCESSING_ERRORS.inc(); - log.error("Failed to replay DLQ entry (offset: {})", record.offset(), e); - - // On error, commit any successful offsets before retrying - if (!pendingOffsets.isEmpty()) { - try { - consumer.commitSync(pendingOffsets); - log.debug("Committed {} successful offsets before error handling", pendingOffsets.size()); - pendingOffsets.clear(); - } catch (Exception commitEx) { - log.error("Failed to commit offsets after error", commitEx); - } - } - - Thread.sleep(pollTimeoutSeconds * 1000); // Wait before retrying - } - } - } catch (Exception e) { - log.error("Error in DLQ processing", e); - Thread.sleep(pollTimeoutSeconds * 1000); // Wait before retrying - } - } - } catch (Exception e) { - log.error("Fatal error in DLQ processing thread", e); - } + /** + * Reset counters + */ + public void resetCounters() { + processedCount.set(0); + errorCount.set(0); + log.info("DLQ replay counters reset"); } } From 8070a96e1ae72dfb08c00b219f2ff39bda697f4e Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 14:13:55 +0530 Subject: [PATCH 08/23] MLH-1477 lazily connect to elasticsearch index --- .../atlas/web/service/DLQReplayService.java | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 86615451fd5..fd4d92f3fae 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -49,23 +49,34 @@ public class DLQReplayService { private final String dlqTopic="ATLAS_ES_DLQ"; @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") private final String consumerGroupId= "atlas_dq_replay_group"; - private ElasticSearchIndex esIndex; - private final ObjectMapper mapper; + private volatile ElasticSearchIndex esIndex; + private final ObjectMapper mapper = new ObjectMapper(); + private final Configuration elasticsearchConfig; private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicInteger processedCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); - public DLQReplayService(AtlasJanusGraph graph) throws BackendException { + public DLQReplayService(AtlasJanusGraph graph) { // Extract ES configuration from existing graph GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); Configuration fullConfig = graphConfig.getConfiguration(); - // Restrict to the index namespace - Configuration indexConfig = fullConfig.restrictTo("elasticsearch"); - this.esIndex = new ElasticSearchIndex(indexConfig); - this.mapper = new ObjectMapper(); + // Store the config for lazy initialization + this.elasticsearchConfig = fullConfig.restrictTo("elasticsearch"); + } + + private synchronized void initializeElasticsearch() throws BackendException { + if (esIndex == null) { + try { + this.esIndex = new ElasticSearchIndex(elasticsearchConfig); + log.info("Successfully initialized Elasticsearch connection"); + } catch (Exception e) { + log.error("Failed to initialize Elasticsearch connection", e); + throw e; + } + } } /** @@ -174,6 +185,11 @@ private void processMessages() { * Replay a single DLQ entry */ private void replayDLQEntry(String dlqJson) throws Exception { + // Initialize Elasticsearch if not already done + if (esIndex == null) { + initializeElasticsearch(); + } + DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); log.debug("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); From 944b1fdcab8c93410f380ee6ed5e06a8d152f924 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 19:44:29 +0530 Subject: [PATCH 09/23] MLH-1477 load indexProvider like janusgraph --- .../atlas/web/service/DLQReplayService.java | 34 ++++++------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index fd4d92f3fae..84bd9119a8b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -7,11 +7,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.janusgraph.diskstorage.Backend; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransaction; +import org.janusgraph.diskstorage.StandardIndexProvider; import org.janusgraph.diskstorage.configuration.Configuration; import org.janusgraph.diskstorage.dlq.DLQEntry; import org.janusgraph.diskstorage.dlq.SerializableIndexMutation; +import org.janusgraph.diskstorage.indexing.IndexProvider; import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig; import org.janusgraph.diskstorage.indexing.IndexEntry; import org.janusgraph.diskstorage.indexing.IndexMutation; @@ -26,6 +29,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -49,39 +53,26 @@ public class DLQReplayService { private final String dlqTopic="ATLAS_ES_DLQ"; @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") private final String consumerGroupId= "atlas_dq_replay_group"; - private volatile ElasticSearchIndex esIndex; + private final ElasticSearchIndex esIndex; private final ObjectMapper mapper = new ObjectMapper(); - private final Configuration elasticsearchConfig; - private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicInteger processedCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); - public DLQReplayService(AtlasJanusGraph graph) { + public DLQReplayService(AtlasJanusGraph graph) throws BackendException { // Extract ES configuration from existing graph GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); Configuration fullConfig = graphConfig.getConfiguration(); - - // Store the config for lazy initialization - this.elasticsearchConfig = fullConfig.restrictTo("elasticsearch"); - } - - private synchronized void initializeElasticsearch() throws BackendException { - if (esIndex == null) { - try { - this.esIndex = new ElasticSearchIndex(elasticsearchConfig); - log.info("Successfully initialized Elasticsearch connection"); - } catch (Exception e) { - log.error("Failed to initialize Elasticsearch connection", e); - throw e; - } - } + IndexProvider indexProvider = Backend.getImplementationClass(fullConfig.restrictTo("search"), fullConfig.get(GraphDatabaseConfiguration.INDEX_BACKEND,"search"), + StandardIndexProvider.getAllProviderClasses()); + esIndex = (ElasticSearchIndex) indexProvider; } /** * Start replaying DLQ messages */ + @PostConstruct public synchronized void startReplay() { if (isRunning.get()) { log.warn("DLQ replay is already running"); @@ -185,11 +176,6 @@ private void processMessages() { * Replay a single DLQ entry */ private void replayDLQEntry(String dlqJson) throws Exception { - // Initialize Elasticsearch if not already done - if (esIndex == null) { - initializeElasticsearch(); - } - DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); log.debug("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); From 14c8b0fa84e834f8a050412724f3756dc54f5aea Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 21:05:25 +0530 Subject: [PATCH 10/23] MLH-1477 initialise bootstrap servers --- .../org/apache/atlas/web/service/DLQReplayService.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 84bd9119a8b..c4d75143845 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -1,6 +1,8 @@ package org.apache.atlas.web.service; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -47,11 +49,12 @@ public class DLQReplayService { private static final Logger log = LoggerFactory.getLogger(DLQReplayService.class); - @Value("${atlas.kafka.bootstrap.servers:localhost:9092}") - private final String bootstrapServers = "localhost:9092"; + private String bootstrapServers; @Value("${atlas.kafka.dlq.topic:ATLAS_ES_DLQ}") + private final String dlqTopic="ATLAS_ES_DLQ"; @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") + private final String consumerGroupId= "atlas_dq_replay_group"; private final ElasticSearchIndex esIndex; private final ObjectMapper mapper = new ObjectMapper(); @@ -60,9 +63,10 @@ public class DLQReplayService { private final AtomicInteger processedCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); - public DLQReplayService(AtlasJanusGraph graph) throws BackendException { + public DLQReplayService(AtlasJanusGraph graph) throws BackendException, AtlasException { // Extract ES configuration from existing graph GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); + this.bootstrapServers = ApplicationProperties.get().getString("atlas.graph.kafka.bootstrap.servers"); Configuration fullConfig = graphConfig.getConfiguration(); IndexProvider indexProvider = Backend.getImplementationClass(fullConfig.restrictTo("search"), fullConfig.get(GraphDatabaseConfiguration.INDEX_BACKEND,"search"), StandardIndexProvider.getAllProviderClasses()); From d2d098e4efb0e5e62c55257cd27d91522cb2f7a7 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 27 Oct 2025 22:39:50 +0530 Subject: [PATCH 11/23] MLH-1477 increase timeout --- .../atlas/web/rest/DLQAdminController.java | 12 ++++++ .../atlas/web/service/DLQReplayService.java | 39 +++++++++++++++---- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java index ae1cd1c4f05..ee506dee60b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java @@ -20,6 +20,18 @@ public class DLQAdminController { @Autowired private DLQReplayService dlqReplayService; + @POST + @Path("/replay/start") + public void startReplay() { + dlqReplayService.startReplay(); + } + + @POST + @Path("/replay/stop") + public void stopReplay() { + dlqReplayService.stopReplay(); + } + @GET @Path("/replay/status") public Map getStatus() { diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index c4d75143845..091027c7c3d 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -8,6 +8,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.serialization.StringDeserializer; import org.janusgraph.diskstorage.Backend; import org.janusgraph.diskstorage.BackendException; @@ -33,10 +35,7 @@ import javax.annotation.PostConstruct; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -92,10 +91,25 @@ public synchronized void startReplay() { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Process in small batches + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); // Process in smaller batches + + // Increase timeouts to handle slower processing + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5 minutes + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); // 1 minute + consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); // 10 seconds consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Collections.singletonList(dlqTopic)); + consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + log.warn("Consumer group partitions revoked. This might indicate processing is too slow. Partitions: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Consumer group partitions assigned: {}", partitions); + } + }); isRunning.set(true); @@ -163,12 +177,21 @@ private void processMessages() { } } catch (Exception e) { - log.error("Error in DLQ replay processing", e); + log.error("Error in DLQ replay processing. Consumer might be removed from group due to slow processing", e); try { - Thread.sleep(5000); // Wait before retrying + // Give more time for recovery + Thread.sleep(30000); // 30 seconds before retrying + + // Try to rejoin the consumer group + consumer.enforceRebalance(); + log.info("Enforced consumer group rebalance after error"); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; + } catch (Exception re) { + log.error("Failed to recover consumer after error", re); + stopReplay(); // Stop processing if we can't recover + break; } } } From c619f0ca825ccca845e41ca3a2b5caac254ce5f7 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 28 Oct 2025 00:28:03 +0530 Subject: [PATCH 12/23] MLH-1477 add more logs for debugging. update serializer --- .../atlas/web/service/DLQReplayService.java | 155 +++++++++++++++--- 1 file changed, 135 insertions(+), 20 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 091027c7c3d..63a20213f6b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -1,15 +1,18 @@ package org.apache.atlas.web.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.core.JsonParser; +import java.io.IOException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.serialization.StringDeserializer; import org.janusgraph.diskstorage.Backend; import org.janusgraph.diskstorage.BackendException; @@ -35,6 +38,7 @@ import javax.annotation.PostConstruct; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -56,13 +60,59 @@ public class DLQReplayService { private final String consumerGroupId= "atlas_dq_replay_group"; private final ElasticSearchIndex esIndex; - private final ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper; + + private ObjectMapper configureMapper() { + ObjectMapper mapper = new ObjectMapper(); + // Configure to handle property name differences + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + // Add custom deserializer for SerializableIndexMutation + SimpleModule module = new SimpleModule(); + module.addDeserializer(SerializableIndexMutation.class, new JsonDeserializer() { + @Override + public SerializableIndexMutation deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + + // Handle both "new" and "isNew" fields + boolean isNew = node.has("new") ? node.get("new").asBoolean() : + node.has("isNew") ? node.get("isNew").asBoolean() : false; + + boolean isDeleted = node.has("isDeleted") ? node.get("isDeleted").asBoolean() : false; + + List additions = new ArrayList<>(); + List deletions = new ArrayList<>(); + + if (node.has("additions") && node.get("additions").isArray()) { + for (JsonNode entry : node.get("additions")) { + additions.add(new SerializableIndexMutation.SerializableIndexEntry( + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) + )); + } + } + + if (node.has("deletions") && node.get("deletions").isArray()) { + for (JsonNode entry : node.get("deletions")) { + deletions.add(new SerializableIndexMutation.SerializableIndexEntry( + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) + )); + } + } + + return new SerializableIndexMutation(isNew, isDeleted, additions, deletions); + } + }); + mapper.registerModule(module); + return mapper; + } private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicInteger processedCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); public DLQReplayService(AtlasJanusGraph graph) throws BackendException, AtlasException { + this.mapper = configureMapper(); // Extract ES configuration from existing graph GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); this.bootstrapServers = ApplicationProperties.get().getString("atlas.graph.kafka.bootstrap.servers"); @@ -91,12 +141,16 @@ public synchronized void startReplay() { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); // Process in smaller batches + // Process only 2 records at a time since processing is slow + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); - // Increase timeouts to handle slower processing - consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5 minutes - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); // 1 minute + // Significantly increase timeouts to handle very slow processing + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 minutes consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); // 10 seconds + + // Add backoff between polls to give more processing time + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000"); // Wait up to 5 seconds for messages consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { @@ -105,10 +159,30 @@ public void onPartitionsRevoked(Collection partitions) { log.warn("Consumer group partitions revoked. This might indicate processing is too slow. Partitions: {}", partitions); } - @Override - public void onPartitionsAssigned(Collection partitions) { - log.info("Consumer group partitions assigned: {}", partitions); - } + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Consumer group partitions assigned: {}", partitions); + + // Log offset information for each partition + for (TopicPartition partition : partitions) { + try { + long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); + long committedOffset = -1; + OffsetAndMetadata committed = consumer.committed(Collections.singleton(partition)).get(partition); + if (committed != null) { + committedOffset = committed.offset(); + } + long position = consumer.position(partition); + + log.info("Partition {} - End offset: {}, Committed offset: {}, Current position: {}, " + + "Messages available: {}", + partition, endOffset, committedOffset, position, + endOffset - position); + } catch (Exception e) { + log.error("Error checking offsets for partition: " + partition, e); + } + } + } }); isRunning.set(true); @@ -150,10 +224,38 @@ private void processMessages() { while (isRunning.get()) { try { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + // Log current position before poll + for (TopicPartition partition : consumer.assignment()) { + try { + long currentPosition = consumer.position(partition); + long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); + log.debug("Before poll - Partition: {}, Position: {}, End Offset: {}, Available: {}", + partition, currentPosition, endOffset, endOffset - currentPosition); + } catch (Exception e) { + log.error("Error checking position before poll", e); + } + } + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(30)); if (records.isEmpty()) { - continue; // No messages, continue polling + // Log why we got no records + for (TopicPartition partition : consumer.assignment()) { + try { + long currentPosition = consumer.position(partition); + long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); + if (currentPosition >= endOffset) { + log.info("No messages available - Partition {} at end (Position: {}, End: {})", + partition, currentPosition, endOffset); + } else { + log.info("No messages returned despite availability - Partition {} (Position: {}, End: {}, Available: {})", + partition, currentPosition, endOffset, endOffset - currentPosition); + } + } catch (Exception e) { + log.error("Error checking position after empty poll", e); + } + } + continue; } log.info("Received {} DLQ messages to replay", records.count()); @@ -203,29 +305,42 @@ private void processMessages() { * Replay a single DLQ entry */ private void replayDLQEntry(String dlqJson) throws Exception { + long startTime = System.currentTimeMillis(); DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); - - log.debug("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); + log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); // Reconstruct mutations from serialized form + log.info("Starting mutation reconstruction for index: {}", entry.getIndexName()); Map> mutations = reconstructMutations(entry); + log.info("Completed mutation reconstruction in {}ms", System.currentTimeMillis() - startTime); // Create key information retriever + log.info("Creating key information retriever for index: {}", entry.getIndexName()); KeyInformation.IndexRetriever keyInfo = createKeyInfoRetriever(entry); // Create a new transaction for replay + log.info("Beginning transaction for index: {}", entry.getIndexName()); BaseTransaction replayTx = esIndex.beginTransaction( - new StandardBaseTransactionConfig.Builder().build() + new StandardBaseTransactionConfig.Builder().commitTime(Instant.now()).build() ); try { // This is the same method that originally failed - now we're replaying it! + log.info("Starting ES mutation for index: {}", entry.getIndexName()); + long mutateStartTime = System.currentTimeMillis(); esIndex.mutate(mutations, keyInfo, replayTx); + log.info("ES mutation completed in {}ms, committing transaction", System.currentTimeMillis() - mutateStartTime); + + long commitStartTime = System.currentTimeMillis(); replayTx.commit(); + log.info("Transaction commit completed in {}ms", System.currentTimeMillis() - commitStartTime); - log.debug("Successfully replayed mutation for index: {}", entry.getIndexName()); + log.info("Successfully replayed mutation for index: {}. Total time: {}ms", + entry.getIndexName(), System.currentTimeMillis() - startTime); } catch (Exception e) { + log.warn("Error replaying mutation for index: {}, rolling back transaction", + entry.getIndexName(), e); replayTx.rollback(); throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); } From 6b8b77f5ee9c73d8309e6bedbd4e7a35daaed0f9 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 28 Oct 2025 00:37:19 +0530 Subject: [PATCH 13/23] MLH-1477 reduce poll time --- .../atlas/web/service/DLQReplayService.java | 135 +++++++++++------- 1 file changed, 82 insertions(+), 53 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 63a20213f6b..8662cfc25f3 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -141,16 +141,11 @@ public synchronized void startReplay() { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning - // Process only 2 records at a time since processing is slow - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); - - // Significantly increase timeouts to handle very slow processing - consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000"); // 15 minutes - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 minutes - consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); // 10 seconds - - // Add backoff between polls to give more processing time - consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000"); // Wait up to 5 seconds for messages + // Use reasonable batch size and timeouts + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5 minutes + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 30 seconds + consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // 3 seconds consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { @@ -221,22 +216,33 @@ public synchronized void stopReplay() { */ private void processMessages() { log.info("DLQ replay thread started, polling for messages..."); + long lastPollTime = System.currentTimeMillis(); + long lastCommitTime = System.currentTimeMillis(); + int processedInBatch = 0; while (isRunning.get()) { try { - // Log current position before poll - for (TopicPartition partition : consumer.assignment()) { + long now = System.currentTimeMillis(); + long timeSinceLastPoll = now - lastPollTime; + + // If we're taking too long between polls, log a warning + if (timeSinceLastPoll > 60000) { // 1 minute + log.warn("Long delay between polls: {}ms. This could lead to consumer group removal.", + timeSinceLastPoll); + } + + // Commit any pending offsets if we haven't in a while + if (now - lastCommitTime > 30000) { // 30 seconds try { - long currentPosition = consumer.position(partition); - long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); - log.debug("Before poll - Partition: {}, Position: {}, End Offset: {}, Available: {}", - partition, currentPosition, endOffset, endOffset - currentPosition); + consumer.commitSync(); + lastCommitTime = now; + log.debug("Committed offsets after timeout"); } catch (Exception e) { - log.error("Error checking position before poll", e); + log.error("Failed to commit offsets", e); } } - ConsumerRecords records = consumer.poll(Duration.ofSeconds(30)); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); if (records.isEmpty()) { // Log why we got no records @@ -263,9 +269,20 @@ private void processMessages() { for (ConsumerRecord record : records) { try { replayDLQEntry(record.value()); - consumer.commitSync(); // Commit only after successful replay + + // Track processing time and commit more frequently + processedInBatch++; processedCount.incrementAndGet(); - + + // Commit every 2 messages or if it's been too long + if (processedInBatch >= 2 || (now - lastCommitTime > 30000)) { + consumer.commitSync(); + lastCommitTime = now; + processedInBatch = 0; + log.debug("Committed offset after batch or timeout"); + } + + lastPollTime = System.currentTimeMillis(); // Reset poll timer after successful processing log.info("Successfully replayed DLQ entry (offset: {})", record.offset()); } catch (Exception e) { @@ -306,43 +323,55 @@ private void processMessages() { */ private void replayDLQEntry(String dlqJson) throws Exception { long startTime = System.currentTimeMillis(); - DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); - log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); + long memoryBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + try { + DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); + log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); - // Reconstruct mutations from serialized form - log.info("Starting mutation reconstruction for index: {}", entry.getIndexName()); - Map> mutations = reconstructMutations(entry); - log.info("Completed mutation reconstruction in {}ms", System.currentTimeMillis() - startTime); + // Reconstruct mutations from serialized form + log.info("Starting mutation reconstruction for index: {}", entry.getIndexName()); + Map> mutations = reconstructMutations(entry); + log.info("Completed mutation reconstruction in {}ms", System.currentTimeMillis() - startTime); - // Create key information retriever - log.info("Creating key information retriever for index: {}", entry.getIndexName()); - KeyInformation.IndexRetriever keyInfo = createKeyInfoRetriever(entry); + // Create key information retriever + log.info("Creating key information retriever for index: {}", entry.getIndexName()); + KeyInformation.IndexRetriever keyInfo = createKeyInfoRetriever(entry); - // Create a new transaction for replay - log.info("Beginning transaction for index: {}", entry.getIndexName()); - BaseTransaction replayTx = esIndex.beginTransaction( - new StandardBaseTransactionConfig.Builder().commitTime(Instant.now()).build() - ); + // Create a new transaction for replay + log.info("Beginning transaction for index: {}", entry.getIndexName()); + BaseTransaction replayTx = esIndex.beginTransaction( + new StandardBaseTransactionConfig.Builder().commitTime(Instant.now()).build() + ); - try { - // This is the same method that originally failed - now we're replaying it! - log.info("Starting ES mutation for index: {}", entry.getIndexName()); - long mutateStartTime = System.currentTimeMillis(); - esIndex.mutate(mutations, keyInfo, replayTx); - log.info("ES mutation completed in {}ms, committing transaction", System.currentTimeMillis() - mutateStartTime); - - long commitStartTime = System.currentTimeMillis(); - replayTx.commit(); - log.info("Transaction commit completed in {}ms", System.currentTimeMillis() - commitStartTime); - - log.info("Successfully replayed mutation for index: {}. Total time: {}ms", - entry.getIndexName(), System.currentTimeMillis() - startTime); - - } catch (Exception e) { - log.warn("Error replaying mutation for index: {}, rolling back transaction", - entry.getIndexName(), e); - replayTx.rollback(); - throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); + try { + // This is the same method that originally failed - now we're replaying it! + log.info("Starting ES mutation for index: {}", entry.getIndexName()); + long mutateStartTime = System.currentTimeMillis(); + esIndex.mutate(mutations, keyInfo, replayTx); + log.info("ES mutation completed in {}ms, committing transaction", System.currentTimeMillis() - mutateStartTime); + + long commitStartTime = System.currentTimeMillis(); + replayTx.commit(); + log.info("Transaction commit completed in {}ms", System.currentTimeMillis() - commitStartTime); + + long memoryAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + long memoryUsed = memoryAfter - memoryBefore; + long totalTime = System.currentTimeMillis() - startTime; + + log.info("Successfully replayed mutation for index: {}. Total time: {}ms, Memory used: {}MB, Current heap usage: {}MB", + entry.getIndexName(), totalTime, memoryUsed / (1024 * 1024), + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / (1024 * 1024)); + + } catch (Exception e) { + log.warn("Error replaying mutation for index: {}, rolling back transaction", + entry.getIndexName(), e); + replayTx.rollback(); + throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); + } + } catch (IOException e) { + log.error("Failed to deserialize DLQ entry JSON", e); + throw e; } } From 16fcaa92aeb91e3feb119f9a5673063724a02f60 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 28 Oct 2025 00:47:18 +0530 Subject: [PATCH 14/23] MLH-1477 use pause and resume --- .../atlas/web/service/DLQReplayService.java | 204 +++++++++--------- 1 file changed, 97 insertions(+), 107 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 8662cfc25f3..36c8a5b72e1 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -54,14 +54,14 @@ public class DLQReplayService { private String bootstrapServers; @Value("${atlas.kafka.dlq.topic:ATLAS_ES_DLQ}") - private final String dlqTopic="ATLAS_ES_DLQ"; - @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") + @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") private final String consumerGroupId= "atlas_dq_replay_group"; + private final ElasticSearchIndex esIndex; private final ObjectMapper mapper; - + private ObjectMapper configureMapper() { ObjectMapper mapper = new ObjectMapper(); // Configure to handle property name differences @@ -72,40 +72,41 @@ private ObjectMapper configureMapper() { @Override public SerializableIndexMutation deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { JsonNode node = p.getCodec().readTree(p); - + // Handle both "new" and "isNew" fields - boolean isNew = node.has("new") ? node.get("new").asBoolean() : - node.has("isNew") ? node.get("isNew").asBoolean() : false; - + boolean isNew = node.has("new") ? node.get("new").asBoolean() : + node.has("isNew") ? node.get("isNew").asBoolean() : false; + boolean isDeleted = node.has("isDeleted") ? node.get("isDeleted").asBoolean() : false; - + List additions = new ArrayList<>(); List deletions = new ArrayList<>(); - + if (node.has("additions") && node.get("additions").isArray()) { for (JsonNode entry : node.get("additions")) { additions.add(new SerializableIndexMutation.SerializableIndexEntry( - entry.get("field").asText(), - mapper.treeToValue(entry.get("value"), Object.class) + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) )); } } - + if (node.has("deletions") && node.get("deletions").isArray()) { for (JsonNode entry : node.get("deletions")) { deletions.add(new SerializableIndexMutation.SerializableIndexEntry( - entry.get("field").asText(), - mapper.treeToValue(entry.get("value"), Object.class) + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) )); } } - + return new SerializableIndexMutation(isNew, isDeleted, additions, deletions); } }); mapper.registerModule(module); return mapper; } + private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicInteger processedCount = new AtomicInteger(0); @@ -141,43 +142,44 @@ public synchronized void startReplay() { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning - // Use reasonable batch size and timeouts - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); - consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5 minutes - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 30 seconds - consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // 3 seconds + + // Optimized settings for long-running message processing with pause/resume pattern + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); // Process one at a time + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); // 10 minutes - safety net + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000"); // 90 seconds + consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000"); // 30 seconds consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { - log.warn("Consumer group partitions revoked. This might indicate processing is too slow. Partitions: {}", partitions); + log.warn("Consumer group partitions revoked. Partitions: {}", partitions); } - @Override - public void onPartitionsAssigned(Collection partitions) { - log.info("Consumer group partitions assigned: {}", partitions); - - // Log offset information for each partition - for (TopicPartition partition : partitions) { - try { - long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); - long committedOffset = -1; - OffsetAndMetadata committed = consumer.committed(Collections.singleton(partition)).get(partition); - if (committed != null) { - committedOffset = committed.offset(); - } - long position = consumer.position(partition); - - log.info("Partition {} - End offset: {}, Committed offset: {}, Current position: {}, " + - "Messages available: {}", - partition, endOffset, committedOffset, position, - endOffset - position); - } catch (Exception e) { - log.error("Error checking offsets for partition: " + partition, e); + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Consumer group partitions assigned: {}", partitions); + + // Log offset information for each partition + for (TopicPartition partition : partitions) { + try { + long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); + long committedOffset = -1; + OffsetAndMetadata committed = consumer.committed(Collections.singleton(partition)).get(partition); + if (committed != null) { + committedOffset = committed.offset(); } + long position = consumer.position(partition); + + log.info("Partition {} - End offset: {}, Committed offset: {}, Current position: {}, " + + "Messages available: {}", + partition, endOffset, committedOffset, position, + endOffset - position); + } catch (Exception e) { + log.error("Error checking offsets for partition: " + partition, e); } } + } }); isRunning.set(true); @@ -212,36 +214,13 @@ public synchronized void stopReplay() { } /** - * Process messages from the DLQ topic + * Process messages from the DLQ topic using pause/resume pattern */ private void processMessages() { log.info("DLQ replay thread started, polling for messages..."); - long lastPollTime = System.currentTimeMillis(); - long lastCommitTime = System.currentTimeMillis(); - int processedInBatch = 0; while (isRunning.get()) { try { - long now = System.currentTimeMillis(); - long timeSinceLastPoll = now - lastPollTime; - - // If we're taking too long between polls, log a warning - if (timeSinceLastPoll > 60000) { // 1 minute - log.warn("Long delay between polls: {}ms. This could lead to consumer group removal.", - timeSinceLastPoll); - } - - // Commit any pending offsets if we haven't in a while - if (now - lastCommitTime > 30000) { // 30 seconds - try { - consumer.commitSync(); - lastCommitTime = now; - log.debug("Committed offsets after timeout"); - } catch (Exception e) { - log.error("Failed to commit offsets", e); - } - } - ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); if (records.isEmpty()) { @@ -251,10 +230,10 @@ private void processMessages() { long currentPosition = consumer.position(partition); long endOffset = consumer.endOffsets(Collections.singleton(partition)).get(partition); if (currentPosition >= endOffset) { - log.info("No messages available - Partition {} at end (Position: {}, End: {})", + log.debug("No messages available - Partition {} at end (Position: {}, End: {})", partition, currentPosition, endOffset); } else { - log.info("No messages returned despite availability - Partition {} (Position: {}, End: {}, Available: {})", + log.info("No messages returned despite availability - Partition {} (Position: {}, End: {}, Available: {})", partition, currentPosition, endOffset, endOffset - currentPosition); } } catch (Exception e) { @@ -266,56 +245,63 @@ private void processMessages() { log.info("Received {} DLQ messages to replay", records.count()); - for (ConsumerRecord record : records) { - try { - replayDLQEntry(record.value()); - - // Track processing time and commit more frequently - processedInBatch++; - processedCount.incrementAndGet(); - - // Commit every 2 messages or if it's been too long - if (processedInBatch >= 2 || (now - lastCommitTime > 30000)) { - consumer.commitSync(); - lastCommitTime = now; - processedInBatch = 0; - log.debug("Committed offset after batch or timeout"); - } - - lastPollTime = System.currentTimeMillis(); // Reset poll timer after successful processing - log.info("Successfully replayed DLQ entry (offset: {})", record.offset()); + // PAUSE consumption immediately to prevent timeout during processing + Set pausedPartitions = consumer.assignment(); + consumer.pause(pausedPartitions); + log.info("Paused consumption on partitions: {} to process messages", pausedPartitions); - } catch (Exception e) { - errorCount.incrementAndGet(); - log.error("Failed to replay DLQ entry (offset: {}), will retry later", - record.offset(), e); + try { + // Now process without time pressure - heartbeats continue automatically + for (ConsumerRecord record : records) { + try { + log.info("Processing DLQ entry at offset: {} from partition: {}", + record.offset(), record.partition()); - // Don't commit - message will be reprocessed - break; // Stop processing this batch to retry later + long processingStartTime = System.currentTimeMillis(); + replayDLQEntry(record.value()); + long processingTime = System.currentTimeMillis() - processingStartTime; + + processedCount.incrementAndGet(); + + // Commit after each successful message + Map offsets = Collections.singletonMap( + new TopicPartition(record.topic(), record.partition()), + new OffsetAndMetadata(record.offset() + 1) + ); + consumer.commitSync(offsets); + + log.info("Successfully replayed DLQ entry (offset: {}, partition: {}) in {}ms", + record.offset(), record.partition(), processingTime); + + } catch (Exception e) { + errorCount.incrementAndGet(); + log.error("Failed to replay DLQ entry (offset: {}, partition: {}). Stopping batch processing.", + record.offset(), record.partition(), e); + // Don't commit - message will be reprocessed next time + break; // Stop processing this batch to retry later + } } + } finally { + // RESUME consumption - always do this even if processing failed + consumer.resume(pausedPartitions); + log.info("Resumed consumption on partitions: {}", pausedPartitions); } } catch (Exception e) { - log.error("Error in DLQ replay processing. Consumer might be removed from group due to slow processing", e); + log.error("Error in DLQ replay processing loop", e); try { - // Give more time for recovery - Thread.sleep(30000); // 30 seconds before retrying - - // Try to rejoin the consumer group - consumer.enforceRebalance(); - log.info("Enforced consumer group rebalance after error"); + // Back off before retry + Thread.sleep(10000); // 10 seconds } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - break; - } catch (Exception re) { - log.error("Failed to recover consumer after error", re); - stopReplay(); // Stop processing if we can't recover + log.warn("DLQ replay thread interrupted during error recovery"); break; } } } - log.info("DLQ replay thread finished"); + log.info("DLQ replay thread finished. Total processed: {}, Total errors: {}", + processedCount.get(), errorCount.get()); } /** @@ -324,7 +310,7 @@ private void processMessages() { private void replayDLQEntry(String dlqJson) throws Exception { long startTime = System.currentTimeMillis(); long memoryBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - + try { DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); @@ -366,7 +352,11 @@ private void replayDLQEntry(String dlqJson) throws Exception { } catch (Exception e) { log.warn("Error replaying mutation for index: {}, rolling back transaction", entry.getIndexName(), e); - replayTx.rollback(); + try { + replayTx.rollback(); + } catch (Exception rollbackException) { + log.error("Failed to rollback transaction for index: {}", entry.getIndexName(), rollbackException); + } throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); } } catch (IOException e) { @@ -505,4 +495,4 @@ public void resetCounters() { errorCount.set(0); log.info("DLQ replay counters reset"); } -} +} \ No newline at end of file From d9dd78a1791a62355eafc6b080fb5de83384daf9 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 10 Nov 2025 21:56:41 +0530 Subject: [PATCH 15/23] MLH-1477 improve DLQ handling --- pom.xml | 2 +- .../atlas/web/rest/DLQAdminController.java | 6 - .../atlas/web/service/DLQReplayService.java | 383 ++++++++++-------- 3 files changed, 225 insertions(+), 166 deletions(-) diff --git a/pom.xml b/pom.xml index f8c830f67dc..b602afb91cf 100644 --- a/pom.xml +++ b/pom.xml @@ -720,7 +720,7 @@ 4.4.13 2.13.4.2 2.12.4 - 1.0.2-atlan-SNAPSHOT + 1.0.2-atlan 0.5.3 2.3.1 1 diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java index ee506dee60b..d3d74592c63 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DLQAdminController.java @@ -26,12 +26,6 @@ public void startReplay() { dlqReplayService.startReplay(); } - @POST - @Path("/replay/stop") - public void stopReplay() { - dlqReplayService.stopReplay(); - } - @GET @Path("/replay/status") public Map getStatus() { diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 36c8a5b72e1..d3a4e000ecd 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -13,35 +13,40 @@ import org.apache.atlas.repository.graphdb.janus.AtlasJanusGraph; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.janusgraph.diskstorage.Backend; -import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.BaseTransaction; import org.janusgraph.diskstorage.StandardIndexProvider; +import org.janusgraph.diskstorage.TemporaryBackendException; import org.janusgraph.diskstorage.configuration.Configuration; import org.janusgraph.diskstorage.dlq.DLQEntry; import org.janusgraph.diskstorage.dlq.SerializableIndexMutation; import org.janusgraph.diskstorage.indexing.IndexProvider; +import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures; import org.janusgraph.diskstorage.util.StandardBaseTransactionConfig; import org.janusgraph.diskstorage.indexing.IndexEntry; import org.janusgraph.diskstorage.indexing.IndexMutation; import org.janusgraph.diskstorage.indexing.KeyInformation; import org.janusgraph.diskstorage.es.ElasticSearchIndex; -import org.janusgraph.core.Cardinality; -import org.janusgraph.core.schema.Parameter; import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.database.IndexSerializer; import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.graphdb.database.index.IndexInfoRetriever; +import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.Duration; import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; /** * Service for replaying DLQ messages back to Elasticsearch. @@ -54,72 +59,69 @@ public class DLQReplayService { private String bootstrapServers; @Value("${atlas.kafka.dlq.topic:ATLAS_ES_DLQ}") - private final String dlqTopic="ATLAS_ES_DLQ"; + private String dlqTopic="ATLAS_ES_DLQ"; @Value("${atlas.kafka.dlq.consumerGroupId:atlas_dq_replay_group}") - private final String consumerGroupId= "atlas_dq_replay_group"; + private String consumerGroupId= "atlas_dq_replay_group"; - private final ElasticSearchIndex esIndex; - private final ObjectMapper mapper; + @Value("${atlas.kafka.dlq.maxRetries:3}") + private int maxRetries = 3; - private ObjectMapper configureMapper() { - ObjectMapper mapper = new ObjectMapper(); - // Configure to handle property name differences - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - // Add custom deserializer for SerializableIndexMutation - SimpleModule module = new SimpleModule(); - module.addDeserializer(SerializableIndexMutation.class, new JsonDeserializer() { - @Override - public SerializableIndexMutation deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { - JsonNode node = p.getCodec().readTree(p); + // Kafka consumer configuration + @Value("${atlas.kafka.dlq.maxPollRecords:10}") + private int maxPollRecords = 10; - // Handle both "new" and "isNew" fields - boolean isNew = node.has("new") ? node.get("new").asBoolean() : - node.has("isNew") ? node.get("isNew").asBoolean() : false; + @Value("${atlas.kafka.dlq.maxPollIntervalMs:600000}") + private int maxPollIntervalMs = 600000; // 10 minutes - boolean isDeleted = node.has("isDeleted") ? node.get("isDeleted").asBoolean() : false; + @Value("${atlas.kafka.dlq.sessionTimeoutMs:90000}") + private int sessionTimeoutMs = 90000; // 90 seconds - List additions = new ArrayList<>(); - List deletions = new ArrayList<>(); + @Value("${atlas.kafka.dlq.heartbeatIntervalMs:30000}") + private int heartbeatIntervalMs = 30000; // 30 seconds - if (node.has("additions") && node.get("additions").isArray()) { - for (JsonNode entry : node.get("additions")) { - additions.add(new SerializableIndexMutation.SerializableIndexEntry( - entry.get("field").asText(), - mapper.treeToValue(entry.get("value"), Object.class) - )); - } - } + // Timing configuration + @Value("${atlas.kafka.dlq.pollTimeoutSeconds:5}") + private int pollTimeoutSeconds = 5; - if (node.has("deletions") && node.get("deletions").isArray()) { - for (JsonNode entry : node.get("deletions")) { - deletions.add(new SerializableIndexMutation.SerializableIndexEntry( - entry.get("field").asText(), - mapper.treeToValue(entry.get("value"), Object.class) - )); - } - } + @Value("${atlas.kafka.dlq.shutdownWaitMs:1000}") + private int shutdownWaitMs = 1000; - return new SerializableIndexMutation(isNew, isDeleted, additions, deletions); - } - }); - mapper.registerModule(module); - return mapper; - } + @Value("${atlas.kafka.dlq.consumerCloseTimeoutSeconds:30}") + private int consumerCloseTimeoutSeconds = 30; + + @Value("${atlas.kafka.dlq.errorBackoffMs:10000}") + private int errorBackoffMs = 10000; // 10 seconds + + private final ElasticSearchIndex esIndex; + protected final IndexSerializer indexSerializer; + private final ObjectMapper mapper; + private GraphDatabaseConfiguration graphConfig; + private StandardJanusGraph standardJanusGraph; + + // Track retry attempts per partition-offset to handle poison pills (in-memory) + private final Map retryTracker = new ConcurrentHashMap<>(); private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean isHealthy = new AtomicBoolean(true); + private volatile Thread replayThread; private final AtomicInteger processedCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); + private final AtomicInteger skippedCount = new AtomicInteger(0); - public DLQReplayService(AtlasJanusGraph graph) throws BackendException, AtlasException { + public DLQReplayService(AtlasJanusGraph graph) throws AtlasException { this.mapper = configureMapper(); // Extract ES configuration from existing graph - GraphDatabaseConfiguration graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); + this.graphConfig = ((StandardJanusGraph)graph.getGraph()).getConfiguration(); + this.standardJanusGraph = ((StandardJanusGraph)graph.getGraph()); this.bootstrapServers = ApplicationProperties.get().getString("atlas.graph.kafka.bootstrap.servers"); Configuration fullConfig = graphConfig.getConfiguration(); IndexProvider indexProvider = Backend.getImplementationClass(fullConfig.restrictTo("search"), fullConfig.get(GraphDatabaseConfiguration.INDEX_BACKEND,"search"), StandardIndexProvider.getAllProviderClasses()); + StoreFeatures storeFeatures = graphConfig.getBackend().getStoreFeatures(); + this.indexSerializer = new IndexSerializer(fullConfig, graphConfig.getSerializer(), + graphConfig.getBackend().getIndexInformation(), storeFeatures.isDistributed() && storeFeatures.isKeyOrdered()); esIndex = (ElasticSearchIndex) indexProvider; } @@ -134,26 +136,34 @@ public synchronized void startReplay() { } log.info("Starting DLQ replay service for topic: {} with consumer group: {}", dlqTopic, consumerGroupId); - + Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit after success - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start from beginning + // Use "latest" to avoid reprocessing all historical messages if consumer group state is lost + // On first startup with no committed offsets, will only process new DLQ entries + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Optimized settings for long-running message processing with pause/resume pattern - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); // Process one at a time - consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); // 10 minutes - safety net - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000"); // 90 seconds - consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000"); // 30 seconds + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords)); + consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs)); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(sessionTimeoutMs)); + consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs)); consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(dlqTopic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { log.warn("Consumer group partitions revoked. Partitions: {}", partitions); + + // Clean up retry tracker for revoked partitions to prevent memory leak + for (TopicPartition partition : partitions) { + retryTracker.keySet().removeIf(key -> key.startsWith(partition.partition() + "-")); + log.info("Cleaned up retry tracker for revoked partition: {}", partition); + } } @Override @@ -183,9 +193,10 @@ public void onPartitionsAssigned(Collection partitions) { }); isRunning.set(true); + isHealthy.set(true); // Start processing in a separate thread - Thread replayThread = new Thread(this::processMessages, "DLQ-Replay-Thread"); + replayThread = new Thread(this::processMessages, "DLQ-Replay-Thread"); replayThread.setDaemon(true); replayThread.start(); @@ -193,24 +204,33 @@ public void onPartitionsAssigned(Collection partitions) { } /** - * Stop replaying DLQ messages + * Gracefully shutdown the DLQ replay service */ - public synchronized void stopReplay() { + @PreDestroy + public synchronized void shutdown() { if (!isRunning.get()) { - log.warn("DLQ replay is not running"); + log.info("DLQ replay service is not running, nothing to shutdown"); return; } - log.info("Stopping DLQ replay service..."); + log.info("Shutting down DLQ replay service..."); isRunning.set(false); + // Close consumer - this will trigger final offset commit and leave the consumer group if (consumer != null) { - consumer.close(); - consumer = null; + try { + consumer.wakeup(); // Interrupt any ongoing poll() + // Give thread time to finish current iteration + Thread.sleep(shutdownWaitMs); + consumer.close(Duration.ofSeconds(consumerCloseTimeoutSeconds)); // Graceful close with timeout + log.info("Kafka consumer closed successfully"); + } catch (Exception e) { + log.error("Error closing Kafka consumer during shutdown", e); + } } - log.info("DLQ replay service stopped. Processed: {}, Errors: {}", - processedCount.get(), errorCount.get()); + log.info("DLQ replay service shutdown complete. Total processed: {}, Total errors: {}, Total skipped: {}", + processedCount.get(), errorCount.get(), skippedCount.get()); } /** @@ -219,9 +239,10 @@ public synchronized void stopReplay() { private void processMessages() { log.info("DLQ replay thread started, polling for messages..."); - while (isRunning.get()) { - try { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + try { + while (isRunning.get()) { + try { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(pollTimeoutSeconds)); if (records.isEmpty()) { // Log why we got no records @@ -243,7 +264,7 @@ private void processMessages() { continue; } - log.info("Received {} DLQ messages to replay", records.count()); + log.debug("Received {} DLQ messages to replay", records.count()); // PAUSE consumption immediately to prevent timeout during processing Set pausedPartitions = consumer.assignment(); @@ -253,6 +274,8 @@ private void processMessages() { try { // Now process without time pressure - heartbeats continue automatically for (ConsumerRecord record : records) { + String retryKey = record.partition() + "-" + record.offset(); + try { log.info("Processing DLQ entry at offset: {} from partition: {}", record.offset(), record.partition()); @@ -263,22 +286,59 @@ private void processMessages() { processedCount.incrementAndGet(); - // Commit after each successful message + // Success - remove from retry tracker and commit offset + retryTracker.remove(retryKey); + Map offsets = Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1) ); consumer.commitSync(offsets); - log.info("Successfully replayed DLQ entry (offset: {}, partition: {}) in {}ms", + log.debug("Successfully replayed DLQ entry (offset: {}, partition: {}) in {}ms", record.offset(), record.partition(), processingTime); + } catch (TemporaryBackendException temporaryBackendException) { + // Treat temporary backend exceptions as transient - will retry + errorCount.incrementAndGet(); + log.warn("Temporary backend exception while replaying DLQ entry (offset: {}, partition: {}). " + + "Will retry on next poll. Error: {}", + record.offset(), record.partition(), temporaryBackendException.getMessage()); + Thread.sleep(errorBackoffMs); } catch (Exception e) { errorCount.incrementAndGet(); - log.error("Failed to replay DLQ entry (offset: {}, partition: {}). Stopping batch processing.", - record.offset(), record.partition(), e); - // Don't commit - message will be reprocessed next time - break; // Stop processing this batch to retry later + + // Track retry attempts for this specific offset (in-memory) + int retryCount = retryTracker.getOrDefault(retryKey, 0) + 1; + retryTracker.put(retryKey, retryCount); + + if (retryCount >= maxRetries) { + // Poison pill detected - skip this message to unblock the partition + log.error("DLQ entry at offset {} partition {} failed {} times (max retries reached). " + + "SKIPPING this message to prevent partition blockage. Error: {}", + record.offset(), record.partition(), retryCount, e.getMessage(), e); + + skippedCount.incrementAndGet(); + retryTracker.remove(retryKey); + + // Commit offset to move past poison pill + Map offsets = Collections.singletonMap( + new TopicPartition(record.topic(), record.partition()), + new OffsetAndMetadata(record.offset() + 1) + ); + try { + consumer.commitSync(offsets); + log.warn("Committed offset {} to skip poison pill", record.offset() + 1); + } catch (Exception commitEx) { + log.error("Failed to commit offset after skipping poison pill", commitEx); + } + } else { + // Will retry this message on next poll - don't commit offset + log.warn("Failed to replay DLQ entry (offset: {}, partition: {}). Retry {}/{}. " + + "Will retry on next poll. Error: {}", + record.offset(), record.partition(), retryCount, maxRetries, e.getMessage()); + } + // CRITICAL: Don't break - continue processing remaining records in batch } } } finally { @@ -287,21 +347,36 @@ private void processMessages() { log.info("Resumed consumption on partitions: {}", pausedPartitions); } - } catch (Exception e) { - log.error("Error in DLQ replay processing loop", e); - try { - // Back off before retry - Thread.sleep(10000); // 10 seconds - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - log.warn("DLQ replay thread interrupted during error recovery"); + } catch (WakeupException e) { + // Expected during shutdown - exit gracefully + log.info("Kafka consumer wakeup called, exiting processing loop"); break; + } catch (Exception e) { + log.error("Error in DLQ replay processing loop", e); + try { + // Back off before retry + Thread.sleep(errorBackoffMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("DLQ replay thread interrupted during error recovery"); + break; + } } } + } catch (Exception e) { + log.error("Fatal error in DLQ replay thread - marking service as unhealthy", e); + isHealthy.set(false); + } finally { + boolean wasRunning = isRunning.get(); + isRunning.set(false); + + if (wasRunning && !isHealthy.get()) { + log.error("DLQ replay thread terminated unexpectedly! Service is unhealthy. Pod should be restarted."); + } + + log.info("DLQ replay thread finished. Total processed: {}, Total errors: {}, Total skipped: {}", + processedCount.get(), errorCount.get(), skippedCount.get()); } - - log.info("DLQ replay thread finished. Total processed: {}, Total errors: {}", - processedCount.get(), errorCount.get()); } /** @@ -315,15 +390,17 @@ private void replayDLQEntry(String dlqJson) throws Exception { DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); + // Create key information retriever + log.info("Creating key information retriever for index: {}", entry.getIndexName()); + StandardJanusGraphTx standardJanusGraphTx = (StandardJanusGraphTx) this.standardJanusGraph.newTransaction(); + IndexInfoRetriever keyInfo = this.indexSerializer.getIndexInfoRetriever(standardJanusGraphTx); + + // Reconstruct mutations from serialized form log.info("Starting mutation reconstruction for index: {}", entry.getIndexName()); - Map> mutations = reconstructMutations(entry); + Map> mutations = reconstructMutations(entry, keyInfo.get("search")); log.info("Completed mutation reconstruction in {}ms", System.currentTimeMillis() - startTime); - // Create key information retriever - log.info("Creating key information retriever for index: {}", entry.getIndexName()); - KeyInformation.IndexRetriever keyInfo = createKeyInfoRetriever(entry); - // Create a new transaction for replay log.info("Beginning transaction for index: {}", entry.getIndexName()); BaseTransaction replayTx = esIndex.beginTransaction( @@ -334,7 +411,7 @@ private void replayDLQEntry(String dlqJson) throws Exception { // This is the same method that originally failed - now we're replaying it! log.info("Starting ES mutation for index: {}", entry.getIndexName()); long mutateStartTime = System.currentTimeMillis(); - esIndex.mutate(mutations, keyInfo, replayTx); + esIndex.mutate(mutations, keyInfo.get("search"), replayTx); log.info("ES mutation completed in {}ms, committing transaction", System.currentTimeMillis() - mutateStartTime); long commitStartTime = System.currentTimeMillis(); @@ -368,7 +445,7 @@ private void replayDLQEntry(String dlqJson) throws Exception { /** * Reconstruct IndexMutation objects from serialized form */ - private Map> reconstructMutations(DLQEntry entry) { + private Map> reconstructMutations(DLQEntry entry, KeyInformation.IndexRetriever indexRetriever) { Map> result = new HashMap<>(); for (Map.Entry> storeEntry : @@ -385,7 +462,7 @@ private Map> reconstructMutations(DLQEntry en // Reconstruct IndexMutation IndexMutation mutation = new IndexMutation( - createStoreRetriever(storeName), // This is simplified - you may need more context + indexRetriever.get(storeName), serMut.isNew(), serMut.isDeleted() ); @@ -410,68 +487,13 @@ private Map> reconstructMutations(DLQEntry en } /** - * Create key information retriever for replay + * Health check for liveness/readiness probes + * @return true if the replay thread is healthy and running */ - private KeyInformation.IndexRetriever createKeyInfoRetriever(DLQEntry entry) { - return new KeyInformation.IndexRetriever() { - @Override - public KeyInformation.StoreRetriever get(String store) { - return new KeyInformation.StoreRetriever() { - @Override - public KeyInformation get(String key) { - // This is a simplified implementation - // In practice, you might need to store more schema information in the DLQ - return createKeyInformation(store); - } - }; - } - - @Override - public KeyInformation get(String store, String key) { - return createKeyInformation(store); - } - - @Override - public void invalidate(String store) { - // No-op for replay - } - }; - } - - /** - * Create a basic KeyInformation object - */ - private KeyInformation createKeyInformation(String store) { - // This is a simplified implementation - // You might need to enhance this based on your schema requirements - return new KeyInformation() { - @Override - public Class getDataType() { - return String.class; - } - - @Override - public Parameter[] getParameters() { - return new Parameter[0]; - } - - @Override - public Cardinality getCardinality() { - return Cardinality.SINGLE; - } - }; - } - - /** - * Create a store retriever for IndexMutation - */ - private KeyInformation.StoreRetriever createStoreRetriever(String store) { - return new KeyInformation.StoreRetriever() { - @Override - public KeyInformation get(String key) { - return createKeyInformation(store); - } - }; + public boolean isHealthy() { + // Check if thread is alive and healthy + boolean threadAlive = replayThread != null && replayThread.isAlive(); + return isHealthy.get() && threadAlive; } /** @@ -480,19 +502,62 @@ public KeyInformation get(String key) { public Map getStatus() { Map status = new HashMap<>(); status.put("isRunning", isRunning.get()); + status.put("isHealthy", isHealthy()); + status.put("threadAlive", replayThread != null && replayThread.isAlive()); status.put("processedCount", processedCount.get()); status.put("errorCount", errorCount.get()); + status.put("skippedCount", skippedCount.get()); status.put("topic", dlqTopic); status.put("consumerGroup", consumerGroupId); + status.put("maxRetries", maxRetries); + status.put("activeRetries", retryTracker.size()); + return status; } - /** - * Reset counters - */ - public void resetCounters() { - processedCount.set(0); - errorCount.set(0); - log.info("DLQ replay counters reset"); + private ObjectMapper configureMapper() { + ObjectMapper mapper = new ObjectMapper(); + // Configure to handle property name differences + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + // Add custom deserializer for SerializableIndexMutation + SimpleModule module = new SimpleModule(); + module.addDeserializer(SerializableIndexMutation.class, new JsonDeserializer<>() { + @Override + public SerializableIndexMutation deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + + // Handle both "new" and "isNew" fields + boolean isNew = node.has("new") ? node.get("new").asBoolean() : + node.has("isNew") ? node.get("isNew").asBoolean() : false; + + boolean isDeleted = node.has("isDeleted") ? node.get("isDeleted").asBoolean() : false; + + List additions = new ArrayList<>(); + List deletions = new ArrayList<>(); + + if (node.has("additions") && node.get("additions").isArray()) { + for (JsonNode entry : node.get("additions")) { + additions.add(new SerializableIndexMutation.SerializableIndexEntry( + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) + )); + } + } + + if (node.has("deletions") && node.get("deletions").isArray()) { + for (JsonNode entry : node.get("deletions")) { + deletions.add(new SerializableIndexMutation.SerializableIndexEntry( + entry.get("field").asText(), + mapper.treeToValue(entry.get("value"), Object.class) + )); + } + } + + return new SerializableIndexMutation(isNew, isDeleted, additions, deletions); + } + }); + mapper.registerModule(module); + return mapper; } + } \ No newline at end of file From cae1ad03d1707b25472e7467667904f066e75ec2 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 10 Nov 2025 22:11:42 +0530 Subject: [PATCH 16/23] MLH-1477 break on errors --- .../apache/atlas/web/service/DLQReplayService.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index d3a4e000ecd..3fe387a8d34 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -302,9 +302,10 @@ private void processMessages() { // Treat temporary backend exceptions as transient - will retry errorCount.incrementAndGet(); log.warn("Temporary backend exception while replaying DLQ entry (offset: {}, partition: {}). " + - "Will retry on next poll. Error: {}", + "Will retry on next poll. STOPPING batch processing to prevent skipping this message. Error: {}", record.offset(), record.partition(), temporaryBackendException.getMessage()); Thread.sleep(errorBackoffMs); + break; } catch (Exception e) { errorCount.incrementAndGet(); @@ -332,13 +333,15 @@ private void processMessages() { } catch (Exception commitEx) { log.error("Failed to commit offset after skipping poison pill", commitEx); } + // Continue to next record after skipping poison pill } else { // Will retry this message on next poll - don't commit offset log.warn("Failed to replay DLQ entry (offset: {}, partition: {}). Retry {}/{}. " + - "Will retry on next poll. Error: {}", + "STOPPING batch processing to prevent skipping this message. Will retry on next poll. Error: {}", record.offset(), record.partition(), retryCount, maxRetries, e.getMessage()); + // CRITICAL: Break to prevent subsequent records from committing offsets past this failed record + break; } - // CRITICAL: Don't break - continue processing remaining records in batch } } } finally { @@ -434,7 +437,7 @@ private void replayDLQEntry(String dlqJson) throws Exception { } catch (Exception rollbackException) { log.error("Failed to rollback transaction for index: {}", entry.getIndexName(), rollbackException); } - throw new Exception("Failed to replay mutation for index: " + entry.getIndexName(), e); + throw new TemporaryBackendException("Failed to replay mutation for index: " + entry.getIndexName(), e); } } catch (IOException e) { log.error("Failed to deserialize DLQ entry JSON", e); From 40535e5c5dc56333bee60f3dd6a052abe29ccb61 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 11 Nov 2025 00:43:59 +0530 Subject: [PATCH 17/23] MLH-1477 seek back when error --- .../atlas/web/service/DLQReplayService.java | 123 +++++++++++++----- 1 file changed, 89 insertions(+), 34 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 3fe387a8d34..4e3f8e7652a 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -110,6 +110,8 @@ public class DLQReplayService { private final AtomicInteger errorCount = new AtomicInteger(0); private final AtomicInteger skippedCount = new AtomicInteger(0); + private final static String INDEX_NAME = "search"; + public DLQReplayService(AtlasJanusGraph graph) throws AtlasException { this.mapper = configureMapper(); // Extract ES configuration from existing graph @@ -117,7 +119,7 @@ public DLQReplayService(AtlasJanusGraph graph) throws AtlasException { this.standardJanusGraph = ((StandardJanusGraph)graph.getGraph()); this.bootstrapServers = ApplicationProperties.get().getString("atlas.graph.kafka.bootstrap.servers"); Configuration fullConfig = graphConfig.getConfiguration(); - IndexProvider indexProvider = Backend.getImplementationClass(fullConfig.restrictTo("search"), fullConfig.get(GraphDatabaseConfiguration.INDEX_BACKEND,"search"), + IndexProvider indexProvider = Backend.getImplementationClass(fullConfig.restrictTo(INDEX_NAME), fullConfig.get(GraphDatabaseConfiguration.INDEX_BACKEND,INDEX_NAME), StandardIndexProvider.getAllProviderClasses()); StoreFeatures storeFeatures = graphConfig.getBackend().getStoreFeatures(); this.indexSerializer = new IndexSerializer(fullConfig, graphConfig.getSerializer(), @@ -271,6 +273,9 @@ private void processMessages() { consumer.pause(pausedPartitions); log.info("Paused consumption on partitions: {} to process messages", pausedPartitions); + Long failedOffset = null; // Track if we need to seek back + TopicPartition failedPartition = null; + try { // Now process without time pressure - heartbeats continue automatically for (ConsumerRecord record : records) { @@ -304,6 +309,11 @@ private void processMessages() { log.warn("Temporary backend exception while replaying DLQ entry (offset: {}, partition: {}). " + "Will retry on next poll. STOPPING batch processing to prevent skipping this message. Error: {}", record.offset(), record.partition(), temporaryBackendException.getMessage()); + + // Mark for seek-back - consumer position has already advanced past this offset + failedOffset = record.offset(); + failedPartition = new TopicPartition(record.topic(), record.partition()); + Thread.sleep(errorBackoffMs); break; } catch (Exception e) { @@ -339,12 +349,30 @@ private void processMessages() { log.warn("Failed to replay DLQ entry (offset: {}, partition: {}). Retry {}/{}. " + "STOPPING batch processing to prevent skipping this message. Will retry on next poll. Error: {}", record.offset(), record.partition(), retryCount, maxRetries, e.getMessage()); - // CRITICAL: Break to prevent subsequent records from committing offsets past this failed record + + // Mark for seek-back - consumer position has already advanced past this offset + failedOffset = record.offset(); + failedPartition = new TopicPartition(record.topic(), record.partition()); + break; } } } } finally { + // CRITICAL: Seek back to failed offset before resuming + // The consumer's position advances when records are polled, not when committed + // We need to rewind to retry the failed message + if (failedOffset != null && failedPartition != null) { + try { + consumer.seek(failedPartition, failedOffset); + log.info("Seeked back to offset {} on partition {} to retry failed message", + failedOffset, failedPartition); + } catch (Exception seekEx) { + log.error("Failed to seek back to offset {} on partition {}. Message may be skipped!", + failedOffset, failedPartition, seekEx); + } + } + // RESUME consumption - always do this even if processing failed consumer.resume(pausedPartitions); log.info("Resumed consumption on partitions: {}", pausedPartitions); @@ -387,64 +415,91 @@ private void processMessages() { */ private void replayDLQEntry(String dlqJson) throws Exception { long startTime = System.currentTimeMillis(); - long memoryBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - + StandardJanusGraphTx standardJanusGraphTx = null; + BaseTransaction replayTx = null; + try { DLQEntry entry = mapper.readValue(dlqJson, DLQEntry.class); log.info("Replaying DLQ entry for index: {}, store: {}", entry.getIndexName(), entry.getStoreName()); // Create key information retriever log.info("Creating key information retriever for index: {}", entry.getIndexName()); - StandardJanusGraphTx standardJanusGraphTx = (StandardJanusGraphTx) this.standardJanusGraph.newTransaction(); + standardJanusGraphTx = (StandardJanusGraphTx) this.standardJanusGraph.newTransaction(); IndexInfoRetriever keyInfo = this.indexSerializer.getIndexInfoRetriever(standardJanusGraphTx); - // Reconstruct mutations from serialized form log.info("Starting mutation reconstruction for index: {}", entry.getIndexName()); - Map> mutations = reconstructMutations(entry, keyInfo.get("search")); + Map> mutations = reconstructMutations(entry, keyInfo.get(INDEX_NAME)); log.info("Completed mutation reconstruction in {}ms", System.currentTimeMillis() - startTime); // Create a new transaction for replay log.info("Beginning transaction for index: {}", entry.getIndexName()); - BaseTransaction replayTx = esIndex.beginTransaction( + replayTx = esIndex.beginTransaction( new StandardBaseTransactionConfig.Builder().commitTime(Instant.now()).build() ); - try { - // This is the same method that originally failed - now we're replaying it! - log.info("Starting ES mutation for index: {}", entry.getIndexName()); - long mutateStartTime = System.currentTimeMillis(); - esIndex.mutate(mutations, keyInfo.get("search"), replayTx); - log.info("ES mutation completed in {}ms, committing transaction", System.currentTimeMillis() - mutateStartTime); + // This is the same method that originally failed - now we're replaying it! + log.info("Starting ES mutation for index: {}", entry.getIndexName()); + long mutateStartTime = System.currentTimeMillis(); - long commitStartTime = System.currentTimeMillis(); - replayTx.commit(); - log.info("Transaction commit completed in {}ms", System.currentTimeMillis() - commitStartTime); + esIndex.mutate(mutations, keyInfo.get(INDEX_NAME), replayTx); - long memoryAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); - long memoryUsed = memoryAfter - memoryBefore; - long totalTime = System.currentTimeMillis() - startTime; + log.info("ES mutation completed in {}ms, committing ES transaction", System.currentTimeMillis() - mutateStartTime); + long commitStartTime = System.currentTimeMillis(); - log.info("Successfully replayed mutation for index: {}. Total time: {}ms, Memory used: {}MB, Current heap usage: {}MB", - entry.getIndexName(), totalTime, memoryUsed / (1024 * 1024), - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / (1024 * 1024)); + replayTx.commit(); + log.info("ES transaction commit completed in {}ms", System.currentTimeMillis() - commitStartTime); + + // Only commit JanusGraph transaction if ES mutation succeeded + standardJanusGraphTx.commit(); + log.info("JanusGraph transaction committed successfully"); + + long totalTime = System.currentTimeMillis() - startTime; + log.info("Successfully replayed mutation for index: {}. Total time: {}ms", entry.getIndexName(), totalTime); - } catch (Exception e) { - log.warn("Error replaying mutation for index: {}, rolling back transaction", - entry.getIndexName(), e); - try { - replayTx.rollback(); - } catch (Exception rollbackException) { - log.error("Failed to rollback transaction for index: {}", entry.getIndexName(), rollbackException); - } - throw new TemporaryBackendException("Failed to replay mutation for index: " + entry.getIndexName(), e); - } + } catch (TemporaryBackendException e) { + // Already a TemporaryBackendException from JanusGraph - rethrow as-is + log.warn("Temporary backend exception replaying DLQ entry: {}", e.getMessage()); + cleanupFailedTransactions(replayTx, standardJanusGraphTx); + throw e; } catch (IOException e) { - log.error("Failed to deserialize DLQ entry JSON", e); + // JSON deserialization error - permanent failure + log.error("Failed to deserialize DLQ entry JSON - permanent failure", e); + cleanupFailedTransactions(replayTx, standardJanusGraphTx); + throw e; + } catch (Exception e) { + // Other exceptions - might be permanent (bad data, schema issues, etc.) + log.error("Error replaying DLQ entry - treating as permanent failure", e); + cleanupFailedTransactions(replayTx, standardJanusGraphTx); throw e; } } + /** + * Cleanup transactions when replay fails + */ + private void cleanupFailedTransactions(BaseTransaction replayTx, StandardJanusGraphTx standardJanusGraphTx) { + // Rollback ES transaction + if (replayTx != null) { + try { + replayTx.rollback(); + log.debug("Rolled back ES transaction"); + } catch (Exception rollbackException) { + log.error("Failed to rollback ES transaction", rollbackException); + } + } + + // Rollback JanusGraph transaction (don't commit on failure!) + if (standardJanusGraphTx != null) { + try { + standardJanusGraphTx.rollback(); + log.debug("Rolled back JanusGraph transaction"); + } catch (Exception rollbackException) { + log.error("Failed to rollback JanusGraph transaction", rollbackException); + } + } + } + /** * Reconstruct IndexMutation objects from serialized form */ From d0f013462f803d01c523ab9fbeaaa1d4a047d31a Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 11 Nov 2025 01:11:15 +0530 Subject: [PATCH 18/23] MLH-1477 retry with exponential backoff --- .../atlas/web/service/DLQReplayService.java | 72 ++++++++++++++++--- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java index 4e3f8e7652a..8c24f02b5b0 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/DLQReplayService.java @@ -91,7 +91,17 @@ public class DLQReplayService { private int consumerCloseTimeoutSeconds = 30; @Value("${atlas.kafka.dlq.errorBackoffMs:10000}") - private int errorBackoffMs = 10000; // 10 seconds + private int errorBackoffMs = 10000; // 10 seconds (for permanent exceptions) + + // Exponential backoff configuration for temporary exceptions + @Value("${atlas.kafka.dlq.exponentialBackoff.baseDelayMs:1000}") + private int exponentialBackoffBaseDelayMs = 1000; // 1 second + + @Value("${atlas.kafka.dlq.exponentialBackoff.maxDelayMs:60000}") + private int exponentialBackoffMaxDelayMs = 60000; // 60 seconds + + @Value("${atlas.kafka.dlq.exponentialBackoff.multiplier:2.0}") + private double exponentialBackoffMultiplier = 2.0; private final ElasticSearchIndex esIndex; protected final IndexSerializer indexSerializer; @@ -101,6 +111,9 @@ public class DLQReplayService { // Track retry attempts per partition-offset to handle poison pills (in-memory) private final Map retryTracker = new ConcurrentHashMap<>(); + + // Track exponential backoff delay per partition-offset for temporary exceptions (in-memory) + private final Map backoffTracker = new ConcurrentHashMap<>(); private volatile KafkaConsumer consumer; private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -161,10 +174,12 @@ public synchronized void startReplay() { public void onPartitionsRevoked(Collection partitions) { log.warn("Consumer group partitions revoked. Partitions: {}", partitions); - // Clean up retry tracker for revoked partitions to prevent memory leak + // Clean up retry tracker and backoff tracker for revoked partitions to prevent memory leak for (TopicPartition partition : partitions) { - retryTracker.keySet().removeIf(key -> key.startsWith(partition.partition() + "-")); - log.info("Cleaned up retry tracker for revoked partition: {}", partition); + String partitionPrefix = partition.partition() + "-"; + retryTracker.keySet().removeIf(key -> key.startsWith(partitionPrefix)); + backoffTracker.keySet().removeIf(key -> key.startsWith(partitionPrefix)); + log.info("Cleaned up retry and backoff trackers for revoked partition: {}", partition); } } @@ -291,8 +306,9 @@ private void processMessages() { processedCount.incrementAndGet(); - // Success - remove from retry tracker and commit offset + // Success - remove from retry tracker, reset backoff, and commit offset retryTracker.remove(retryKey); + resetExponentialBackoff(retryKey); Map offsets = Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), @@ -304,17 +320,21 @@ private void processMessages() { record.offset(), record.partition(), processingTime); } catch (TemporaryBackendException temporaryBackendException) { - // Treat temporary backend exceptions as transient - will retry + // Treat temporary backend exceptions as transient - will retry with exponential backoff errorCount.incrementAndGet(); + + // Calculate exponential backoff delay + long backoffDelay = calculateExponentialBackoff(retryKey); + log.warn("Temporary backend exception while replaying DLQ entry (offset: {}, partition: {}). " + - "Will retry on next poll. STOPPING batch processing to prevent skipping this message. Error: {}", - record.offset(), record.partition(), temporaryBackendException.getMessage()); + "Will retry on next poll after {}ms backoff (exponential). STOPPING batch processing to prevent skipping this message. Error: {}", + record.offset(), record.partition(), backoffDelay, temporaryBackendException.getMessage()); // Mark for seek-back - consumer position has already advanced past this offset failedOffset = record.offset(); failedPartition = new TopicPartition(record.topic(), record.partition()); - Thread.sleep(errorBackoffMs); + Thread.sleep(backoffDelay); break; } catch (Exception e) { errorCount.incrementAndGet(); @@ -544,6 +564,32 @@ private Map> reconstructMutations(DLQEntry en return result; } + /** + * Calculate exponential backoff delay for a failed message + * @param retryKey The partition-offset key + * @return The delay in milliseconds + */ + private long calculateExponentialBackoff(String retryKey) { + long currentDelay = backoffTracker.getOrDefault(retryKey, (long) exponentialBackoffBaseDelayMs); + long nextDelay = (long) (currentDelay * exponentialBackoffMultiplier); + + // Cap at maximum delay + nextDelay = Math.min(nextDelay, exponentialBackoffMaxDelayMs); + + // Store for next time + backoffTracker.put(retryKey, nextDelay); + + return currentDelay; // Return current delay, store next delay for future use + } + + /** + * Reset exponential backoff for a successfully processed message + * @param retryKey The partition-offset key + */ + private void resetExponentialBackoff(String retryKey) { + backoffTracker.remove(retryKey); + } + /** * Health check for liveness/readiness probes * @return true if the replay thread is healthy and running @@ -569,6 +615,14 @@ public Map getStatus() { status.put("consumerGroup", consumerGroupId); status.put("maxRetries", maxRetries); status.put("activeRetries", retryTracker.size()); + status.put("activeBackoffs", backoffTracker.size()); + + // Exponential backoff configuration + Map backoffConfig = new HashMap<>(); + backoffConfig.put("baseDelayMs", exponentialBackoffBaseDelayMs); + backoffConfig.put("maxDelayMs", exponentialBackoffMaxDelayMs); + backoffConfig.put("multiplier", exponentialBackoffMultiplier); + status.put("exponentialBackoffConfig", backoffConfig); return status; } From 6a1f006a1bc38d2fc985a04525af21f362186798 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 11 Nov 2025 02:46:45 +0530 Subject: [PATCH 19/23] MLH-1477 add tests --- webapp/pom.xml | 12 +- .../DLQReplayServiceIntegrationTest.java | 415 ++++++++++++++ .../web/service/DLQReplayServiceTest.java | 512 ++++++++++++++++++ 3 files changed, 938 insertions(+), 1 deletion(-) create mode 100644 webapp/src/test/java/org/apache/atlas/web/service/DLQReplayServiceIntegrationTest.java create mode 100644 webapp/src/test/java/org/apache/atlas/web/service/DLQReplayServiceTest.java diff --git a/webapp/pom.xml b/webapp/pom.xml index 71b6ddaed5e..613c01fc4d5 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -375,9 +375,19 @@ jersey-multipart + org.mockito - mockito-all + mockito-core + 4.11.0 + test + + + + org.mockito + mockito-junit-jupiter + 4.11.0 + test