Skip to content

Commit fb9a275

Browse files
authored
Merge pull request #4119 from atlanhq/dg2027
DG-2027 : Added metrics for number of mismatches found in a single taskFetch call
2 parents 72e6e57 + b34ec9a commit fb9a275

File tree

6 files changed

+73
-11
lines changed

6 files changed

+73
-11
lines changed

common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,23 @@ public void recordMetric(MetricRecorder recorder) {
5050
}
5151
}
5252

53+
public void recordMetric(MetricRecorder recorder, long invocations) {
54+
if (recorder != null) {
55+
final String name = recorder.name;
56+
final long timeTaken = recorder.getElapsedTime();
57+
Metric metric = metrics.get(name);
58+
59+
if (metric == null) {
60+
metric = new Metric(name);
61+
62+
metrics.put(name, metric);
63+
}
64+
65+
metric.invocations += invocations;
66+
metric.totalTimeMSecs += timeTaken;
67+
}
68+
}
69+
5370
public void clear() {
5471
metrics.clear();
5572
}
@@ -149,5 +166,8 @@ public void incrementInvocations() {
149166
invocations++;
150167
}
151168

169+
public void setInvocations(long invocations) {
170+
this.invocations = invocations;
171+
}
152172
}
153173
}

repository/src/main/java/org/apache/atlas/tasks/TaskExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.atlas.RequestContext;
2323
import org.apache.atlas.model.tasks.AtlasTask;
2424
import org.apache.atlas.repository.graphdb.AtlasVertex;
25+
import org.apache.atlas.service.metrics.MetricsRegistry;
2526
import org.apache.atlas.service.redis.RedisService;
2627
import org.apache.atlas.type.AtlasType;
2728
import org.apache.atlas.utils.AtlasPerfTracer;
@@ -49,13 +50,14 @@ public class TaskExecutor {
4950
private final ICuratorFactory curatorFactory;
5051
private final boolean isActiveActiveHAEnabled;
5152
private final String zkRoot;
53+
private final MetricsRegistry metricRegistry;
5254

5355
private TaskQueueWatcher watcher;
5456
private Thread watcherThread;
5557
private RedisService redisService;
5658

5759
public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
58-
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
60+
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {
5961
this.taskExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
6062
.setDaemon(true)
6163
.setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
@@ -68,11 +70,12 @@ public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFact
6870
this.redisService = redisService;
6971
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
7072
this.zkRoot = zkRoot;
73+
this.metricRegistry = metricsRegistry;
7174
}
7275

7376
public Thread startWatcherThread() {
7477

75-
watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled);
78+
watcher = new TaskQueueWatcher(taskExecutorService, registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot, isActiveActiveHAEnabled, metricRegistry);
7679
watcherThread = new Thread(watcher);
7780
watcherThread.start();
7881
return watcherThread;

repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.atlas.listener.ActiveStateChangeHandler;
2727
import org.apache.atlas.model.tasks.AtlasTask;
2828
import org.apache.atlas.service.Service;
29+
import org.apache.atlas.service.metrics.MetricsRegistry;
2930
import org.apache.atlas.service.redis.RedisService;
3031
import org.apache.commons.collections.CollectionUtils;
3132
import org.apache.commons.configuration.Configuration;
@@ -45,6 +46,7 @@
4546
@Order(7)
4647
public class TaskManagement implements Service, ActiveStateChangeHandler {
4748
private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
49+
private final MetricsRegistry metricRegistry;
4850

4951
private TaskExecutor taskExecutor;
5052
private final Configuration configuration;
@@ -61,19 +63,21 @@ public enum DeleteType {
6163
}
6264

6365
@Inject
64-
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService) {
66+
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry, ICuratorFactory curatorFactory, RedisService redisService, MetricsRegistry metricsRegistry) {
6567
this.configuration = configuration;
6668
this.registry = taskRegistry;
6769
this.redisService = redisService;
6870
this.statistics = new Statistics();
6971
this.taskTypeFactoryMap = new HashMap<>();
7072
this.curatorFactory = curatorFactory;
73+
this.metricRegistry = metricsRegistry;
7174
}
7275

7376
@VisibleForTesting
7477
TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory, ICuratorFactory curatorFactory, RedisService redisService) {
7578
this.configuration = configuration;
7679
this.registry = taskRegistry;
80+
this.metricRegistry = null;
7781
this.redisService = redisService;
7882
this.statistics = new Statistics();
7983
this.taskTypeFactoryMap = new HashMap<>();
@@ -251,7 +255,7 @@ private synchronized void startWatcherThread() {
251255
if (this.taskExecutor == null) {
252256
final boolean isActiveActiveHAEnabled = HAConfiguration.isActiveActiveHAEnabled(configuration);
253257
final String zkRoot = HAConfiguration.getZookeeperProperties(configuration).getZkRoot();
254-
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled);
258+
this.taskExecutor = new TaskExecutor(registry, taskTypeFactoryMap, statistics, curatorFactory, redisService, zkRoot,isActiveActiveHAEnabled, metricRegistry);
255259
}
256260

257261
if (watcherThread == null) {

repository/src/main/java/org/apache/atlas/tasks/TaskQueueWatcher.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.atlas.AtlasConfiguration;
2121
import org.apache.atlas.AtlasConstants;
2222
import org.apache.atlas.ICuratorFactory;
23+
import org.apache.atlas.RequestContext;
2324
import org.apache.atlas.model.tasks.AtlasTask;
25+
import org.apache.atlas.service.metrics.MetricsRegistry;
2426
import org.apache.atlas.service.redis.RedisService;
2527
import org.apache.commons.collections.CollectionUtils;
2628
import org.slf4j.Logger;
@@ -41,6 +43,7 @@ public class TaskQueueWatcher implements Runnable {
4143
private static final TaskExecutor.TaskLogger TASK_LOG = TaskExecutor.TaskLogger.getLogger();
4244
private final String zkRoot;
4345
private final boolean isActiveActiveHAEnabled;
46+
private final MetricsRegistry metricRegistry;
4447

4548
private TaskRegistry registry;
4649
private final ExecutorService executorService;
@@ -57,7 +60,7 @@ public class TaskQueueWatcher implements Runnable {
5760

5861
public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
5962
Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics,
60-
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled) {
63+
ICuratorFactory curatorFactory, RedisService redisService, final String zkRoot, boolean isActiveActiveHAEnabled, MetricsRegistry metricsRegistry) {
6164

6265
this.registry = registry;
6366
this.executorService = executorService;
@@ -67,6 +70,7 @@ public TaskQueueWatcher(ExecutorService executorService, TaskRegistry registry,
6770
this.redisService = redisService;
6871
this.zkRoot = zkRoot;
6972
this.isActiveActiveHAEnabled = isActiveActiveHAEnabled;
73+
this.metricRegistry = metricsRegistry;
7074
}
7175

7276
public void shutdown() {
@@ -87,6 +91,8 @@ public void run() {
8791
LOG.debug("TaskQueueWatcher: running {}:{}", Thread.currentThread().getName(), Thread.currentThread().getId());
8892
}
8993
while (shouldRun.get()) {
94+
RequestContext requestContext = RequestContext.get();
95+
requestContext.setMetricRegistry(this.metricRegistry);
9096
TasksFetcher fetcher = new TasksFetcher(registry);
9197
try {
9298
if (!redisService.acquireDistributedLock(ATLAS_TASK_LOCK)) {
@@ -156,6 +162,8 @@ public void run() {
156162
}
157163

158164
this.tasks = registry.getTasksForReQueue();
165+
RequestContext requestContext = RequestContext.get();
166+
requestContext.clearCache();
159167
}
160168

161169
public List<AtlasTask> getTasks() {

repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.atlas.repository.graphdb.DirectIndexQueryResult;
3636
import org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery;
3737
import org.apache.atlas.type.AtlasType;
38+
import org.apache.atlas.utils.AtlasMetricType;
3839
import org.apache.atlas.utils.AtlasPerfMetrics;
3940
import org.apache.commons.collections.CollectionUtils;
4041
import org.apache.commons.collections4.ListUtils;
@@ -53,6 +54,7 @@
5354
import java.util.List;
5455
import java.util.Arrays;
5556
import java.util.Map;
57+
import java.util.Objects;
5658
import java.util.LinkedHashMap;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.stream.Collectors;
@@ -67,6 +69,7 @@ public class TaskRegistry {
6769
public static final int TASK_FETCH_BATCH_SIZE = 100;
6870
public static final List<Map<String, Object>> SORT_ARRAY = Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc")));
6971
public static final String JANUSGRAPH_VERTEX_INDEX = "janusgraph_vertex_index";
72+
public static final String TASK_MISMATCH_TAG = "mismatchTask";
7073

7174
private AtlasGraph graph;
7275
private TaskService taskService;
@@ -398,6 +401,7 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
398401
Map<String, Object> dsl = mapOf("query", mapOf("bool", mapOf("should", statusClauseList)));
399402
dsl.put("sort", Collections.singletonList(mapOf(Constants.TASK_CREATED_TIME, mapOf("order", "asc"))));
400403
dsl.put("size", size);
404+
long mismatches = 0;
401405
int totalFetched = 0;
402406
while (true) {
403407
int fetched = 0;
@@ -436,8 +440,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
436440
} else {
437441
LOG.warn("Status mismatch for task with guid: {}. Expected PENDING/IN_PROGRESS but found: {}",
438442
atlasTask.getGuid(), atlasTask.getStatus());
439-
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
440-
repairMismatchedTask(atlasTask, docId);
443+
mismatches++;
444+
try {
445+
String docId = LongEncoding.encode(Long.parseLong(vertex.getIdForDisplay()));
446+
repairMismatchedTask(atlasTask, docId);
447+
}
448+
catch (Exception e){
449+
e.printStackTrace();
450+
}
441451
}
442452
} else {
443453
LOG.warn("Null vertex while re-queuing tasks at index {}", fetched);
@@ -456,7 +466,14 @@ public List<AtlasTask> getTasksForReQueueIndexSearch() {
456466
break;
457467
}
458468
}
459-
469+
if(mismatches > 0) {
470+
AtlasPerfMetrics.Metric mismatchMetrics = new AtlasPerfMetrics.Metric(TASK_MISMATCH_TAG);
471+
mismatchMetrics.setMetricType(AtlasMetricType.COUNTER);
472+
mismatchMetrics.addTag("name", TASK_MISMATCH_TAG);
473+
mismatchMetrics.setInvocations(mismatches);
474+
mismatchMetrics.setTotalTimeMSecs(0);
475+
RequestContext.get().addApplicationMetrics(mismatchMetrics);
476+
}
460477
return ret;
461478
}
462479

@@ -466,10 +483,14 @@ private void repairMismatchedTask(AtlasTask atlasTask, String docId) {
466483
try {
467484
// Create a map for the fields to be updated
468485
Map<String, Object> fieldsToUpdate = new HashMap<>();
469-
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
470-
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
486+
if(Objects.nonNull(atlasTask.getEndTime())) {
487+
fieldsToUpdate.put("__task_endTime", atlasTask.getEndTime().getTime());
488+
}
489+
if(Objects.nonNull(atlasTask.getTimeTakenInSeconds())) {
490+
fieldsToUpdate.put("__task_timeTakenInSeconds", atlasTask.getTimeTakenInSeconds());
491+
}
471492
fieldsToUpdate.put("__task_status", atlasTask.getStatus().toString());
472-
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime()); // Set current timestamp
493+
fieldsToUpdate.put("__task_modificationTimestamp", atlasTask.getUpdatedTime().getTime());
473494

474495
// Convert fieldsToUpdate map to JSON using Jackson
475496
ObjectMapper objectMapper = new ObjectMapper();

server-api/src/main/java/org/apache/atlas/RequestContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,12 @@ public void endMetricRecord(MetricRecorder recorder) {
669669
}
670670
}
671671

672+
public void endMetricRecord(MetricRecorder recorder,long invocations){
673+
if (metrics != null && recorder != null) {
674+
metrics.recordMetric(recorder, invocations);
675+
}
676+
}
677+
672678
public void recordEntityGuidUpdate(AtlasEntity entity, String guidInRequest) {
673679
recordEntityGuidUpdate(new EntityGuidPair(entity, guidInRequest));
674680
}

0 commit comments

Comments
 (0)