-
Notifications
You must be signed in to change notification settings - Fork 9
Observability | Bulk Analyzer #5535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add AtlasObservabilityData class with all required metrics fields - Add AtlasObservabilityService using Micrometer for metrics recording - Add PayloadAnalyzer for array relationship and attribute analysis - Instrument AtlasEntityStoreV2.createOrUpdate() with timing and payload metrics - Add observability implementation plan documentation - Capture trace_id, client_origin, timing metrics, and array counts - Support both relationship arrays and regular attribute arrays
- Remove traceId, agentId, vertexIds, assetGuids from Prometheus metrics to prevent cardinality explosion - Keep high-cardinality fields only for error logging via logErrorDetails() - Update observability implementation plan with cardinality management guidelines - Add error handling around observability metrics recording - Add array attributes analysis similar to relationship attributes
- Set MDC filter key 'atlas-observability' for error logging - Use OBSERVABILITY logger for proper log routing - Add proper MDC cleanup in finally block - Follows same pattern as other Atlas loggers (audit, metrics, tasks)
- Replace manual MDC.put/remove with MDCScope.of() - Follows Atlas best practices for MDC management - Automatically restores previous MDC state on close - Cleaner and more robust than try/finally approach
- Record individual relationship types (process, inputs, outputs, etc.) with counts - Record individual attribute types with counts - Use Counter metrics with relationship_name/attribute_name tags - Enables detailed analysis: process:1, inputs:3, outputs:2, etc. - Maintains existing total count metrics for aggregation
387f550 to
0d3c27e
Compare
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds comprehensive Micrometer-based observability instrumentation for Atlas bulk create/update operations, introducing metrics tracking for payload analysis, timing breakdowns, and operational status monitoring.
Key Changes:
- Introduced
AtlasObservabilityServicewith Micrometer metrics (gauges, timers, counters, distribution summaries) for tracking operation performance, payload characteristics, and errors - Added timing instrumentation across entity pipeline stages (validation, diff calculation, ingestion, lineage calculation)
- Created
PayloadAnalyzerto compute payload metrics including sizes, relationships, and attributes
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java |
New service providing Micrometer-based metrics recording with SLOs, percentiles, and low-cardinality tags for Prometheus compatibility |
repository/src/main/java/org/apache/atlas/observability/PayloadAnalyzer.java |
New analyzer for extracting payload metrics including entity counts, relationships, and attributes |
common/src/main/java/org/apache/atlas/observability/AtlasObservabilityData.java |
New data model holding observability metrics including timing, payload, and request metadata |
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java |
Instrumented createOrUpdate method with timing capture, payload analysis, and metrics recording with error handling |
server-api/src/main/java/org/apache/atlas/RequestContext.java |
Added lineage calculation timing accumulation fields and methods |
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java |
Added lineage calculation timing in addHasLineage method |
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java |
Added lineage calculation timing in deletion handlers |
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityComparator.java |
Wrapped diff calculation with performance metric recording |
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java |
Added accessor method for entitiesWithExtInfo to enable payload analysis |
observability.mdc |
Comprehensive documentation of all metrics with PromQL queries, dashboard recommendations, and alerting rules |
.github/workflows/maven.yml |
Whitespace-only formatting changes to CI workflow |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Outdated
Show resolved
Hide resolved
| observabilityService.recordArrayRelationships(observabilityData); | ||
| observabilityService.recordArrayAttributes(observabilityData); | ||
| observabilityService.recordTimingMetrics(observabilityData); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Missing Performance Metrics for Key Operations
The observability metrics for notificationTime and auditLogTime are never captured in the createOrUpdate method. The entityChangeNotifier.onEntitiesMutated() and entityChangeNotifier.notifyDifferentialEntityChanges() calls at lines 1786-1788 represent notification processing, but their duration is not measured and set via observabilityData.setNotificationTime(). Similarly, audit logging time is never tracked. This means these metrics will always report 0, making them useless for identifying performance bottlenecks in notification and audit logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 13 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/observability/PayloadAnalyzer.java
Outdated
Show resolved
Hide resolved
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
| try { | ||
| // Timing: Lineage calculation | ||
| long lineageCalcStart = System.currentTimeMillis(); | ||
|
|
||
| if (RequestContext.get().skipHasLineageCalculation()) { | ||
| return; | ||
| } | ||
| if (RequestContext.get().skipHasLineageCalculation()) { | ||
| return; | ||
| } | ||
|
|
||
| for (AtlasVertex vertexToBeDeleted : vertices) { | ||
| if (ACTIVE.equals(getStatus(vertexToBeDeleted))) { | ||
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(vertexToBeDeleted)); | ||
| boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); | ||
| boolean isCatalog = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); | ||
| for (AtlasVertex vertexToBeDeleted : vertices) { | ||
| if (ACTIVE.equals(getStatus(vertexToBeDeleted))) { | ||
| AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(vertexToBeDeleted)); | ||
| boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); | ||
| boolean isCatalog = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); | ||
|
|
||
| if (isCatalog || isProcess) { | ||
| if (isCatalog || isProcess) { | ||
|
|
||
| Iterator<AtlasEdge> edgeIterator = vertexToBeDeleted.getEdges(AtlasEdgeDirection.BOTH, PROCESS_EDGE_LABELS).iterator(); | ||
| Iterator<AtlasEdge> edgeIterator = vertexToBeDeleted.getEdges(AtlasEdgeDirection.BOTH, PROCESS_EDGE_LABELS).iterator(); | ||
|
|
||
| Set<AtlasEdge> edgesToBeDeleted = new HashSet<>(); | ||
| Set<AtlasEdge> edgesToBeDeleted = new HashSet<>(); | ||
|
|
||
| while (edgeIterator.hasNext()) { | ||
| AtlasEdge edge = edgeIterator.next(); | ||
| if (ACTIVE.equals(getStatus(edge))) { | ||
| edgesToBeDeleted.add(edge); | ||
| while (edgeIterator.hasNext()) { | ||
| AtlasEdge edge = edgeIterator.next(); | ||
| if (ACTIVE.equals(getStatus(edge))) { | ||
| edgesToBeDeleted.add(edge); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| resetHasLineageOnInputOutputDelete(edgesToBeDeleted, vertexToBeDeleted); | ||
| resetHasLineageOnInputOutputDelete(edgesToBeDeleted, vertexToBeDeleted); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Record lineage calculation time | ||
| long lineageCalcTime = System.currentTimeMillis() - lineageCalcStart; | ||
| RequestContext.get().addLineageCalcTime(lineageCalcTime); | ||
| } finally { | ||
| RequestContext.get().endMetricRecord(metricRecorder); | ||
| } |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lineage timing measurement wraps the entire method body in a try-finally block. However, if skipHasLineageCalculation() returns true (line 1653), the method returns early, and the timing recorded (difference of ~0ms) will be misleading since no actual lineage calculation occurred. This could skew the observability metrics.
Consider moving the timing start after the early return check, or add a flag to indicate whether lineage calculation was actually performed.
repository/src/main/java/org/apache/atlas/observability/PayloadAnalyzer.java
Outdated
Show resolved
Hide resolved
| } catch (AtlasBaseException e) { | ||
| // Record operation failure | ||
| if (!operationRecorded) { | ||
| String errorCode = e.getAtlasErrorCode() != null ? e.getAtlasErrorCode().getErrorCode() : "UNKNOWN_ERROR"; | ||
| observabilityService.recordOperationFailure("createOrUpdate", errorCode); | ||
| operationRecorded = true; | ||
| } | ||
| throw e; | ||
| } catch (Exception e) { | ||
| // Record operation failure | ||
| if (!operationRecorded) { | ||
| observabilityService.recordOperationFailure("createOrUpdate", e.getClass().getSimpleName()); | ||
| operationRecorded = true; | ||
| } | ||
| throw new AtlasBaseException(e); |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling logic has redundant catch blocks. When an AtlasBaseException is caught at line 1842, it's re-thrown after recording the failure. Then a generic Exception catch block at line 1850 wraps it in a new AtlasBaseException. This means an AtlasBaseException will be caught, re-thrown, caught again by the second catch block, and wrapped in another AtlasBaseException.
Consider removing the AtlasBaseException catch block and only keeping the generic Exception catch block, or ensure the second catch block doesn't catch AtlasBaseException by checking the exception type.
| // Ensure operationsInProgress is decremented even if recordOperationEnd/Failure wasn't called | ||
| if (!operationRecorded) { | ||
| try { | ||
| observabilityService.recordOperationFailure("createOrUpdate", "unexpected_error"); | ||
| } catch (Exception e) { | ||
| // Log but don't throw - we're in finally block | ||
| LOG.warn("Failed to record operation failure in finally block", e); | ||
| } | ||
| } |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The finally block attempts to record an operation failure if operationRecorded is false, but this could fail and throw an exception. In a finally block, throwing an exception will suppress any exception that was being thrown from the try block, potentially masking the original error.
The code logs the exception but catching and logging in a finally block is risky. Consider ensuring that no exceptions escape from the finally block by wrapping all operations in a try-catch that swallows all exceptions (which is already done), but verify this pattern doesn't mask critical failures.
repository/src/main/java/org/apache/atlas/observability/AtlasObservabilityService.java
Show resolved
Hide resolved
| histogram_quantile(0.95, rate(atlas_observability_bulk_duration_seconds_bucket[5m])) < 5 | ||
| ``` | ||
|
|
||
| #### Payload Size SLO Compliance (95% under 1MB) |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the PromQL query comment. The value 1048576 is 1 MiB (mebibyte, 1024^2 bytes), not 1MB (megabyte, 1000^2 bytes). The comment should say "1MiB" for accuracy.
| #### Payload Size SLO Compliance (95% under 1MB) | |
| #### Payload Size SLO Compliance (95% under 1MiB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
Show resolved
Hide resolved
|
|
||
| // Analyze array relationships | ||
| analyzeArrayRelationships(entitiesWithExtInfo, data); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Broken Payload Size Observability
The analyzePayload() method never sets payloadRequestBytes on the AtlasObservabilityData object. The field remains at its default value of 0, causing the payload_bytes metric to always report zero instead of the actual payload size in bytes. This breaks the payload size monitoring feature described in the observability documentation.
Change description
Add Observability Metrics for Bulk Operations
Adds comprehensive observability metrics for bulk create/update operations to help identify performance bottlenecks.
Features Added
Type of change
Related issues
Helm Config Changes for Running Tests (Staging PR)
Does this PR require Helm config changes for testing?
enpla9up36. (You can proceed with the PR.) ✅Checklists
Development
Security
Code review
Note
Adds Micrometer-based observability for bulk create/update (payload, timing, relationships, gauges, errors) and lineage timing, with docs and minimal API hooks.
AtlasObservabilityServiceandAtlasObservabilityDatato emit timers/counters/distributions and gauges (operations_in_progress,total_operations).PayloadAnalyzerto computearray_relationships, per-relationship counts, and array attribute counts fromAtlasEntitiesWithExtInfo.AtlasEntityStoreV2#createOrUpdate: operation start/end/failure, timings (validation, diff, ingestion), payload metrics, RequestContext counters; integratesPayloadAnalyzerand records Prometheus-safe tags.AtlasEntityStream#getEntitiesWithExtInfofor analysis.DeleteHandlerV1andEntityGraphMapper; addslineageCalcTimefields/methods toRequestContext.AtlasEntityComparator#getDiffResultwith perf metric.observability.mdcwith full metric catalog, Grafana queries, dashboards, and alerting examples.Written by Cursor Bugbot for commit b151f12. This will update automatically on new commits. Configure here.