-
Notifications
You must be signed in to change notification settings - Fork 9
MLH-1239 Tags propagate isolate #5379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,10 +30,7 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.PreDestroy; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.*; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -59,6 +56,7 @@ public class TaskQueueWatcher implements Runnable { | |
| private static final String ATLAS_TASK_LOCK = "atlas:task:lock"; | ||
|
|
||
| private final AtomicBoolean shouldRun = new AtomicBoolean(false); | ||
| private final AtomicBoolean isProcessingTasks = new AtomicBoolean(false); | ||
|
|
||
| public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry, | ||
| Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics, | ||
|
|
@@ -82,6 +80,10 @@ public void shutdown() { | |
| LOG.info("TaskQueueWatcher: Shutdown"); | ||
| } | ||
|
|
||
| public boolean isProcessingTasks() { | ||
| return isProcessingTasks.get(); | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| boolean isMaintenanceMode = AtlasConfiguration.ATLAS_MAINTENANCE_MODE.getBoolean(); | ||
|
|
@@ -115,10 +117,15 @@ public void run() { | |
| taskMetricsService.updateQueueSize(tasks != null ? tasks.size() : 0); | ||
|
|
||
| if (CollectionUtils.isNotEmpty(tasks)) { | ||
| final CountDownLatch latch = new CountDownLatch(tasks.size()); | ||
| submitAll(tasks, latch); | ||
| LOG.info("Submitted {} tasks to the queue", tasks.size()); | ||
| waitForTasksToComplete(latch); | ||
| isProcessingTasks.set(true); | ||
| try { | ||
| final CountDownLatch latch = new CountDownLatch(tasks.size()); | ||
| submitAll(tasks, latch); | ||
| LOG.info("Submitted {} tasks to the queue", tasks.size()); | ||
| waitForTasksToComplete(latch); | ||
| } finally { | ||
| isProcessingTasks.set(false); | ||
| } | ||
| } else { | ||
| LOG.info("TaskQueueWatcher: No tasks fetched during this cycle."); | ||
| } | ||
|
|
@@ -154,23 +161,77 @@ private void waitForTasksToComplete(final CountDownLatch latch) throws Interrupt | |
| } | ||
| } | ||
|
|
||
|
|
||
| private void submitAll(List<AtlasTask> tasks, CountDownLatch latch) { | ||
| if (CollectionUtils.isNotEmpty(tasks)) { | ||
| if (CollectionUtils.isEmpty(tasks)) { | ||
| LOG.info("TasksFetcher: No task to queue"); | ||
| return; | ||
| } | ||
|
|
||
| int submittedCount = 0; | ||
|
|
||
| for (AtlasTask task : tasks) { | ||
| if (task == null) { | ||
| continue; | ||
| } | ||
|
|
||
| for (AtlasTask task : tasks) { | ||
| if (task != null) { | ||
| String taskGuid = task.getGuid(); | ||
| boolean taskSubmitted = false; | ||
|
|
||
| // Keep trying until the task is submitted | ||
| while (!taskSubmitted) { | ||
| if (isMemoryTooHigh()) { | ||
| LOG.warn("High memory usage detected ({}%), pausing task submission for task: {}", | ||
| getMemoryUsagePercent() * 100, taskGuid); | ||
|
|
||
| try { | ||
| // Wait for memory to be freed | ||
| Thread.sleep(AtlasConfiguration.TASK_HIGH_MEMORY_PAUSE_MS.getLong()); | ||
|
|
||
| // Suggest GC if memory is still high after initial wait | ||
| if (isMemoryTooHigh()) { | ||
| LOG.info("Memory still high after pause, suggesting garbage collection"); | ||
| System.gc(); | ||
| Thread.sleep(1000); // Give GC time to work | ||
| } | ||
| } catch (InterruptedException e) { | ||
| LOG.warn("Sleep interrupted while waiting for memory to free", e); | ||
| Thread.currentThread().interrupt(); | ||
| return; // Exit if interrupted | ||
| } | ||
| } else { | ||
| // Memory is okay, submit the task | ||
| TASK_LOG.log(task); | ||
| this.executorService.submit(new TaskExecutor.TaskConsumer(task, | ||
| this.registry, this.taskTypeFactoryMap, this.statistics, latch)); | ||
|
|
||
| taskSubmitted = true; | ||
| submittedCount++; | ||
| LOG.debug("Successfully submitted task: {}", taskGuid); | ||
| } | ||
|
|
||
| this.executorService.submit(new TaskExecutor.TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics, latch)); | ||
| } | ||
| } | ||
|
|
||
| LOG.info("TasksFetcher: Submitted {} tasks to the queue", tasks.size()); | ||
| } else { | ||
| LOG.info("TasksFetcher: No task to queue"); | ||
| if (submittedCount > 0) { | ||
| LOG.info("TasksFetcher: Submitted {} tasks to the queue", submittedCount); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Task Submission and Latch Decrement MismatchThe |
||
| } | ||
| } | ||
|
|
||
| private boolean isMemoryTooHigh() { | ||
| return getMemoryUsagePercent() > (AtlasConfiguration.TASK_MEMORY_THRESHOLD_PERCENT.getInt() / 100.0); | ||
| } | ||
|
|
||
| private double getMemoryUsagePercent() { | ||
| Runtime runtime = Runtime.getRuntime(); | ||
| long maxMemory = runtime.maxMemory(); | ||
| long totalMemory = runtime.totalMemory(); | ||
| long freeMemory = runtime.freeMemory(); | ||
| long usedMemory = totalMemory - freeMemory; | ||
|
|
||
| return (double) usedMemory / maxMemory; | ||
| } | ||
|
|
||
|
|
||
| static class TasksFetcher { | ||
| private TaskRegistry registry; | ||
| private List<AtlasTask> tasks = new ArrayList<>(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -291,7 +291,6 @@ public Response getStatus() { | |
| put(STATUS, serviceState.getState().toString()); | ||
| }}; | ||
|
|
||
|
|
||
| Response response = Response.ok(AtlasJson.toV1Json(responseData)).build(); | ||
|
|
||
| if (LOG.isDebugEnabled()) { | ||
|
|
@@ -301,6 +300,49 @@ public Response getStatus() { | |
| return response; | ||
| } | ||
|
|
||
| @GET | ||
| @Path("ready") | ||
| @Produces(Servlets.JSON_MEDIA_TYPE) | ||
| @Timed | ||
| public Response getReadyStatus() { | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("==> AdminResource.getReadyStatus()"); | ||
| } | ||
|
|
||
| // Check if tasks are being processed | ||
| boolean isProcessingTasks = taskManagement.isProcessingTasks(); | ||
|
|
||
| // Check memory usage | ||
| Runtime runtime = Runtime.getRuntime(); | ||
| long maxMemory = runtime.maxMemory(); | ||
| long totalMemory = runtime.totalMemory(); | ||
| long freeMemory = runtime.freeMemory(); | ||
| long usedMemory = totalMemory - freeMemory; | ||
| double memoryUsage = (double) usedMemory / maxMemory; | ||
|
|
||
| Map<String, Object> responseData = new HashMap<String, Object>() {{ | ||
| put("isProcessingTasks", isProcessingTasks); | ||
| put("memoryUsage", String.format("%.2f", memoryUsage * 100)); | ||
| put("status", serviceState.getState().toString()); | ||
| }}; | ||
|
|
||
| Response.ResponseBuilder builder; | ||
|
|
||
| // Return 503 if processing tasks or high memory | ||
| if (isProcessingTasks || memoryUsage > 0.75) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Inconsistent Memory Thresholds Across ComponentsThe new |
||
| builder = Response.status(Response.Status.SERVICE_UNAVAILABLE) | ||
| .entity(AtlasJson.toV1Json(responseData)); | ||
| } else { | ||
| builder = Response.ok(AtlasJson.toV1Json(responseData)); | ||
| } | ||
|
|
||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("<== AdminResource.getReadyStatus()"); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| @GET | ||
| @Path("session") | ||
| @Produces(Servlets.JSON_MEDIA_TYPE) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic just responsible for submittiing tasks to the queue.
IMO it should never tell whether
isProcessingTasks, best please for this flag would be the TaskExecutor.run() method.Also, generally we do check
RequestContext.get().getCurrentTask();to conclude whether any task is in progress or not, may be same can be used instead the flag but this way also fine