Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public enum AtlasConfiguration {
TASKS_REQUEUE_GRAPH_QUERY("atlas.tasks.requeue.graph.query", false),
TASKS_IN_PROGRESS_GRAPH_QUERY("atlas.tasks.inprogress.graph.query", false),
TASKS_REQUEUE_POLL_INTERVAL("atlas.tasks.requeue.poll.interval.millis", 60000),
TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 1000),
TASKS_QUEUE_SIZE("atlas.tasks.queue.size", 10),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
TASKS_GRAPH_COMMIT_CHUNK_SIZE("atlas.tasks.graph.commit.chunk.size", 100),
Expand Down Expand Up @@ -164,10 +164,10 @@ public enum AtlasConfiguration {
MIN_TIMEOUT_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.timeout", 2),

// Classification propagation thread pool configuration
TAG_ASYNC_NOTIFIER_CORE_POOL_SIZE("atlas.classification.propagation.core.pool.size", 32),
TAG_ASYNC_NOTIFIER_MAX_POOL_SIZE("atlas.classification.propagation.max.pool.size", 200),
TAG_ASYNC_NOTIFIER_QUEUE_CAPACITY("atlas.classification.propagation.queue.capacity", 1000),
TAG_ASYNC_NOTIFIER_KEEP_ALIVE_SECONDS("atlas.classification.propagation.keep.alive.seconds", 300),
TAG_ASYNC_NOTIFIER_CORE_POOL_SIZE("atlas.classification.propagation.core.pool.size", 2), // Reduced
TAG_ASYNC_NOTIFIER_MAX_POOL_SIZE("atlas.classification.propagation.max.pool.size", 4), // Reduced
TAG_ASYNC_NOTIFIER_QUEUE_CAPACITY("atlas.classification.propagation.queue.capacity", 100), // Reduced
TAG_ASYNC_NOTIFIER_KEEP_ALIVE_SECONDS("atlas.classification.propagation.keep.alive.seconds", 60), // Reduced

// ES and Cassandra batch operation configurations
ES_BULK_BATCH_SIZE("atlas.es.bulk.batch.size", 500),
Expand All @@ -176,7 +176,13 @@ public enum AtlasConfiguration {
ES_RETRY_DELAY_MS("atlas.es.retry.delay.ms", 1000),


MIN_EDGES_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.count", 100);
MIN_EDGES_SUPER_VERTEX("atlas.jg.super.vertex.min.edge.count", 100),

// Task resource management configuration
TASK_MEMORY_THRESHOLD_PERCENT("atlas.tasks.memory.threshold.percent", 75),
TASK_HIGH_MEMORY_PAUSE_MS("atlas.tasks.high.memory.pause.ms", 2000),
TASK_MAX_RETRY_ATTEMPTS("atlas.tasks.max.retry.attempts", 3);

private static final Configuration APPLICATION_PROPERTIES;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public void stopQueueWatcher() {
}
}

public boolean isProcessingTasks() {
return watcher != null && watcher.isProcessingTasks();
}

static class TaskConsumer implements Runnable {
private static final int MAX_ATTEMPT_COUNT = 3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public boolean isWatcherActive() {
return watcherThread != null;
}

public boolean isProcessingTasks() {
return taskExecutor != null && taskExecutor.isProcessingTasks();
}

@Override
public void stop() throws AtlasException {
stopQueueWatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -59,6 +56,7 @@ public class TaskQueueWatcher implements Runnable {
private static final String ATLAS_TASK_LOCK = "atlas:task:lock";

private final AtomicBoolean shouldRun = new AtomicBoolean(false);
private final AtomicBoolean isProcessingTasks = new AtomicBoolean(false);

public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
Expand All @@ -82,6 +80,10 @@ public void shutdown() {
LOG.info("TaskQueueWatcher: Shutdown");
}

public boolean isProcessingTasks() {
return isProcessingTasks.get();
}

@Override
public void run() {
boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean();
Expand Down Expand Up @@ -115,10 +117,15 @@ public void run() {
taskMetricsService.updateQueueSize(tasks != null ? tasks.size() : 0);

if (CollectionUtils.isNotEmpty(tasks)) {
final CountDownLatch latch = new CountDownLatch(tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
isProcessingTasks.set(true);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic just responsible for submittiing tasks to the queue.
IMO it should never tell whether isProcessingTasks, best please for this flag would be the TaskExecutor.run() method.

Also, generally we do check RequestContext.get().getCurrentTask(); to conclude whether any task is in progress or not, may be same can be used instead the flag but this way also fine

try {
final CountDownLatch latch = new CountDownLatch(tasks.size());
submitAll(tasks, latch);
LOG.info("Submitted {} tasks to the queue", tasks.size());
waitForTasksToComplete(latch);
} finally {
isProcessingTasks.set(false);
}
} else {
LOG.info("TaskQueueWatcher: No tasks fetched during this cycle.");
}
Expand Down Expand Up @@ -154,23 +161,77 @@ private void waitForTasksToComplete(final CountDownLatch latch) throws Interrupt
}
}


private void submitAll(List<AtlasTask> tasks, CountDownLatch latch) {
if (CollectionUtils.isNotEmpty(tasks)) {
if (CollectionUtils.isEmpty(tasks)) {
LOG.info("TasksFetcher: No task to queue");
return;
}

int submittedCount = 0;

for (AtlasTask task : tasks) {
if (task == null) {
continue;
}

for (AtlasTask task : tasks) {
if (task != null) {
String taskGuid = task.getGuid();
boolean taskSubmitted = false;

// Keep trying until the task is submitted
while (!taskSubmitted) {
if (isMemoryTooHigh()) {
LOG.warn("High memory usage detected ({}%), pausing task submission for task: {}",
getMemoryUsagePercent() * 100, taskGuid);

try {
// Wait for memory to be freed
Thread.sleep(AtlasConfiguration.TASK_HIGH_MEMORY_PAUSE_MS.getLong());

// Suggest GC if memory is still high after initial wait
if (isMemoryTooHigh()) {
LOG.info("Memory still high after pause, suggesting garbage collection");
System.gc();
Thread.sleep(1000); // Give GC time to work
}
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted while waiting for memory to free", e);
Thread.currentThread().interrupt();
return; // Exit if interrupted
}
} else {
// Memory is okay, submit the task
TASK_LOG.log(task);
this.executorService.submit(new TaskExecutor.TaskConsumer(task,
this.registry, this.taskTypeFactoryMap, this.statistics, latch));

taskSubmitted = true;
submittedCount++;
LOG.debug("Successfully submitted task: {}", taskGuid);
}

this.executorService.submit(new TaskExecutor.TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics, latch));
}
}

LOG.info("TasksFetcher: Submitted {} tasks to the queue", tasks.size());
} else {
LOG.info("TasksFetcher: No task to queue");
if (submittedCount > 0) {
LOG.info("TasksFetcher: Submitted {} tasks to the queue", submittedCount);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Task Submission and Latch Decrement Mismatch

The CountDownLatch is initialized with the total task count, but submitAll skips null tasks or exits early without decrementing it. This prevents the latch from reaching zero, causing waitForTasksToComplete to hang. Also, the new task submission retry loop lacks a maximum retry limit, risking an infinite loop if memory stays high.

Fix in Cursor Fix in Web

}
}

private boolean isMemoryTooHigh() {
return getMemoryUsagePercent() > (AtlasConfiguration.TASK_MEMORY_THRESHOLD_PERCENT.getInt() / 100.0);
}

private double getMemoryUsagePercent() {
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;

return (double) usedMemory / maxMemory;
}


static class TasksFetcher {
private TaskRegistry registry;
private List<AtlasTask> tasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ public List<AtlasTask> getTasksForReQueueGraphQuery() {
public List<AtlasTask> getTasksForReQueueIndexSearch() {
DirectIndexQueryResult indexQueryResult = null;
List<AtlasTask> ret = new ArrayList<>();

int size = 1000;
int size = AtlasConfiguration.TASKS_QUEUE_SIZE.getInt();
int from = 0;

IndexSearchParams indexSearchParams = new IndexSearchParams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ public Response getStatus() {
put(STATUS, serviceState.getState().toString());
}};


Response response = Response.ok(AtlasJson.toV1Json(responseData)).build();

if (LOG.isDebugEnabled()) {
Expand All @@ -301,6 +300,49 @@ public Response getStatus() {
return response;
}

@GET
@Path("ready")
@Produces(Servlets.JSON_MEDIA_TYPE)
@Timed
public Response getReadyStatus() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AdminResource.getReadyStatus()");
}

// Check if tasks are being processed
boolean isProcessingTasks = taskManagement.isProcessingTasks();

// Check memory usage
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
double memoryUsage = (double) usedMemory / maxMemory;

Map<String, Object> responseData = new HashMap<String, Object>() {{
put("isProcessingTasks", isProcessingTasks);
put("memoryUsage", String.format("%.2f", memoryUsage * 100));
put("status", serviceState.getState().toString());
}};

Response.ResponseBuilder builder;

// Return 503 if processing tasks or high memory
if (isProcessingTasks || memoryUsage > 0.75) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inconsistent Memory Thresholds Across Components

The new /admin/ready endpoint's memory threshold for readiness is hardcoded to 0.75. This creates an inconsistency with the configurable AtlasConfiguration.TASK_MEMORY_THRESHOLD_PERCENT used by other components like TaskQueueWatcher, potentially causing different behavior between the readiness check and task processing.

Fix in Cursor Fix in Web

builder = Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity(AtlasJson.toV1Json(responseData));
} else {
builder = Response.ok(AtlasJson.toV1Json(responseData));
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getReadyStatus()");
}

return builder.build();
}

@GET
@Path("session")
@Produces(Servlets.JSON_MEDIA_TYPE)
Expand Down
Loading