diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 508f83c9495..4062152755e 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -90,7 +90,7 @@ public enum AtlasConfiguration { TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false), TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false), TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000), - TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000), + TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 10), SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true), TASKS_GRAPH_COMMIT_CHUNK_SIZE("atlas.tasks.graph.commit.chunk.size", 100), @@ -164,10 +164,10 @@ public enum AtlasConfiguration { MIN_TIMEOUT_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.timeout", 2), // Classification propagation thread pool configuration - TAG_ASYNC_NOTIFIER_CORE_POOL_SIZE("atlas.classification.propagation.core.pool.size", 32), - TAG_ASYNC_NOTIFIER_MAX_POOL_SIZE("atlas.classification.propagation.max.pool.size", 200), - TAG_ASYNC_NOTIFIER_QUEUE_CAPACITY("atlas.classification.propagation.queue.capacity", 1000), - TAG_ASYNC_NOTIFIER_KEEP_ALIVE_SECONDS("atlas.classification.propagation.keep.alive.seconds", 300), + TAG_ASYNC_NOTIFIER_CORE_POOL_SIZE("atlas.classification.propagation.core.pool.size", 2), // Reduced + TAG_ASYNC_NOTIFIER_MAX_POOL_SIZE("atlas.classification.propagation.max.pool.size", 4), // Reduced + TAG_ASYNC_NOTIFIER_QUEUE_CAPACITY("atlas.classification.propagation.queue.capacity", 100), // Reduced + TAG_ASYNC_NOTIFIER_KEEP_ALIVE_SECONDS("atlas.classification.propagation.keep.alive.seconds", 60), // Reduced // ES and Cassandra batch operation configurations ES_BULK_BATCH_SIZE("atlas.es.bulk.batch.size", 500), @@ -176,7 +176,13 @@ public enum AtlasConfiguration { ES_RETRY_DELAY_MS("atlas.es.retry.delay.ms", 1000), - MIN_EDGES_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.count", 100); + MIN_EDGES_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.count", 100), + + // Task resource management configuration + TASK_MEMORY_THRESHOLD_PERCENT("atlas.tasks.memory.threshold.percent", 75), + TASK_HIGH_MEMORY_PAUSE_MS("atlas.tasks.high.memory.pause.ms", 2000), + TASK_MAX_RETRY_ATTEMPTS("atlas.tasks.max.retry.attempts", 3); + private static final Configuration APPLICATION_PROPERTIES; static { diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java index 10a02999158..8dce1a29318 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java @@ -91,6 +91,10 @@ public void stopQueueWatcher() { } } + public boolean isProcessingTasks() { + return watcher != null && watcher.isProcessingTasks(); + } + static class TaskConsumer implements Runnable { private static final int MAX_ATTEMPT_COUNT = 3; diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java index 05b63206793..6ff0f007fb1 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java @@ -107,6 +107,10 @@ public boolean isWatcherActive() { return watcherThread != null; } + public boolean isProcessingTasks() { + return taskExecutor != null && taskExecutor.isProcessingTasks(); + } + @Override public void stop() throws AtlasException { stopQueueWatcher(); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java index 4010ee5650b..178da06feee 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java @@ -30,10 +30,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -59,6 +56,7 @@ public class TaskQueueWatcher implements Runnable { private static final String ATLAS_TASK_LOCK = "atlas:task:lock"; private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private final AtomicBoolean isProcessingTasks = new AtomicBoolean(false); public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry, Map taskTypeFactoryMap, TaskManagement.Statistics statistics, @@ -82,6 +80,10 @@ public void shutdown() { LOG.info("TaskQueueWatcher: Shutdown"); } + public boolean isProcessingTasks() { + return isProcessingTasks.get(); + } + @Override public void run() { boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); @@ -115,10 +117,15 @@ public void run() { taskMetricsService.updateQueueSize(tasks != null ? tasks.size() : 0); if (CollectionUtils.isNotEmpty(tasks)) { - final CountDownLatch latch = new CountDownLatch(tasks.size()); - submitAll(tasks, latch); - LOG.info("Submitted {} tasks to the queue", tasks.size()); - waitForTasksToComplete(latch); + isProcessingTasks.set(true); + try { + final CountDownLatch latch = new CountDownLatch(tasks.size()); + submitAll(tasks, latch); + LOG.info("Submitted {} tasks to the queue", tasks.size()); + waitForTasksToComplete(latch); + } finally { + isProcessingTasks.set(false); + } } else { LOG.info("TaskQueueWatcher: No tasks fetched during this cycle."); } @@ -154,23 +161,77 @@ private void waitForTasksToComplete(final CountDownLatch latch) throws Interrupt } } + private void submitAll(List tasks, CountDownLatch latch) { - if (CollectionUtils.isNotEmpty(tasks)) { + if (CollectionUtils.isEmpty(tasks)) { + LOG.info("TasksFetcher: No task to queue"); + return; + } + + int submittedCount = 0; + + for (AtlasTask task : tasks) { + if (task == null) { + continue; + } - for (AtlasTask task : tasks) { - if (task != null) { + String taskGuid = task.getGuid(); + boolean taskSubmitted = false; + + // Keep trying until the task is submitted + while (!taskSubmitted) { + if (isMemoryTooHigh()) { + LOG.warn("High memory usage detected ({}%), pausing task submission for task: {}", + getMemoryUsagePercent() * 100, taskGuid); + + try { + // Wait for memory to be freed + Thread.sleep(AtlasConfiguration.TASK_HIGH_MEMORY_PAUSE_MS.getLong()); + + // Suggest GC if memory is still high after initial wait + if (isMemoryTooHigh()) { + LOG.info("Memory still high after pause, suggesting garbage collection"); + System.gc(); + Thread.sleep(1000); // Give GC time to work + } + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted while waiting for memory to free", e); + Thread.currentThread().interrupt(); + return; // Exit if interrupted + } + } else { + // Memory is okay, submit the task TASK_LOG.log(task); + this.executorService.submit(new TaskExecutor.TaskConsumer(task, + this.registry, this.taskTypeFactoryMap, this.statistics, latch)); + + taskSubmitted = true; + submittedCount++; + LOG.debug("Successfully submitted task: {}", taskGuid); } - - this.executorService.submit(new TaskExecutor.TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics, latch)); } + } - LOG.info("TasksFetcher: Submitted {} tasks to the queue", tasks.size()); - } else { - LOG.info("TasksFetcher: No task to queue"); + if (submittedCount > 0) { + LOG.info("TasksFetcher: Submitted {} tasks to the queue", submittedCount); } } + private boolean isMemoryTooHigh() { + return getMemoryUsagePercent() > (AtlasConfiguration.TASK_MEMORY_THRESHOLD_PERCENT.getInt() / 100.0); + } + + private double getMemoryUsagePercent() { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = runtime.maxMemory(); + long totalMemory = runtime.totalMemory(); + long freeMemory = runtime.freeMemory(); + long usedMemory = totalMemory - freeMemory; + + return (double) usedMemory / maxMemory; + } + + static class TasksFetcher { private TaskRegistry registry; private List tasks = new ArrayList<>(); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java index d16a8a2d3e8..1741b0af6ce 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java @@ -416,8 +416,7 @@ public List getTasksForReQueueGraphQuery() { public List getTasksForReQueueIndexSearch() { DirectIndexQueryResult indexQueryResult = null; List ret = new ArrayList<>(); - - int size = 1000; + int size = AtlasConfiguration.TASKS_QUEUE_SIZE.getInt(); int from = 0; IndexSearchParams indexSearchParams = new IndexSearchParams(); diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index d1506218f67..869b37fbd38 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -291,7 +291,6 @@ public Response getStatus() { put(STATUS, serviceState.getState().toString()); }}; - Response response = Response.ok(AtlasJson.toV1Json(responseData)).build(); if (LOG.isDebugEnabled()) { @@ -301,6 +300,49 @@ public Response getStatus() { return response; } + @GET + @Path("ready") + @Produces(Servlets.JSON_MEDIA_TYPE) + @Timed + public Response getReadyStatus() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.getReadyStatus()"); + } + + // Check if tasks are being processed + boolean isProcessingTasks = taskManagement.isProcessingTasks(); + + // Check memory usage + Runtime runtime = Runtime.getRuntime(); + long maxMemory = runtime.maxMemory(); + long totalMemory = runtime.totalMemory(); + long freeMemory = runtime.freeMemory(); + long usedMemory = totalMemory - freeMemory; + double memoryUsage = (double) usedMemory / maxMemory; + + Map responseData = new HashMap() {{ + put("isProcessingTasks", isProcessingTasks); + put("memoryUsage", String.format("%.2f", memoryUsage * 100)); + put("status", serviceState.getState().toString()); + }}; + + Response.ResponseBuilder builder; + + // Return 503 if processing tasks or high memory + if (isProcessingTasks || memoryUsage > 0.75) { + builder = Response.status(Response.Status.SERVICE_UNAVAILABLE) + .entity(AtlasJson.toV1Json(responseData)); + } else { + builder = Response.ok(AtlasJson.toV1Json(responseData)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.getReadyStatus()"); + } + + return builder.build(); + } + @GET @Path("session") @Produces(Servlets.JSON_MEDIA_TYPE)