From 1e31a6f9043bd41d45d3fd65cc85e560a99febab Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 1 Sep 2025 00:50:30 +0530 Subject: [PATCH 1/9] lineage: model change to support Product Lineage --- .../atlas/model/lineage/LineageListRequest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index b8274d67ee3..bd45e1082cf 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -27,6 +27,8 @@ public class LineageListRequest { private Boolean excludeClassifications; private Boolean immediateNeighbours=false; + public static final String LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE = "ProductAssetLineage"; + public Boolean getImmediateNeighbours() { return immediateNeighbours; } @@ -35,6 +37,8 @@ public void setImmediateNeighbours(Boolean immediateNeighbours) { this.immediateNeighbours = immediateNeighbours; } + private String lineageType = LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE; + public enum LineageDirection {INPUT, OUTPUT} public LineageListRequest() { @@ -91,6 +95,14 @@ public void setDepth(Integer depth) { this.depth = depth; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public LineageDirection getDirection() { return direction; } From 1c1c7d0265ad6b4674026fbdb960e0ec9096eb61 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Thu, 4 Sep 2025 10:21:58 +0530 Subject: [PATCH 2/9] lineage: request payload changes --- .../atlas/model/lineage/LineageListRequest.java | 3 ++- .../atlas/model/lineage/LineageOnDemandRequest.java | 11 +++++++++++ .../apache/atlas/discovery/AtlasLineageContext.java | 9 +++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java index bd45e1082cf..7147934aa47 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageListRequest.java @@ -28,6 +28,7 @@ public class LineageListRequest { private Boolean immediateNeighbours=false; public static final String LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE = "ProductAssetLineage"; + public static final String LINEAGE_TYPE_DATASET_PROCESS_LINEAGE = "DatasetProcessLineage"; public Boolean getImmediateNeighbours() { return immediateNeighbours; @@ -37,7 +38,7 @@ public void setImmediateNeighbours(Boolean immediateNeighbours) { this.immediateNeighbours = immediateNeighbours; } - private String lineageType = LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE; + private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; public enum LineageDirection {INPUT, OUTPUT} diff --git a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java index 7fa953373a6..bcee32a5220 100644 --- a/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/lineage/LineageOnDemandRequest.java @@ -11,6 +11,8 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE; @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @@ -22,6 +24,7 @@ public class LineageOnDemandRequest { private Set attributes; private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; public LineageOnDemandRequest() { this.attributes = new HashSet<>(); @@ -64,6 +67,14 @@ public void setRelationshipTraversalFilters(SearchParameters.FilterCriteria rela this.relationshipTraversalFilters = relationshipTraversalFilters; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public Set getAttributes() { return attributes; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageContext.java index 377c3b360b8..162644ad34a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageContext.java @@ -45,6 +45,7 @@ public class AtlasLineageContext { private boolean isDataset; private boolean isProcess; + private boolean isProduct; private Set attributes; private Set ignoredProcesses; @@ -111,6 +112,14 @@ public void setProcess(boolean process) { isProcess = process; } + public boolean isProduct() { + return isProduct; + } + + public void setProduct(boolean product) { + isProduct = product; + } + public AtlasLineageInfo.LineageDirection getDirection() { return direction; } From d1b59c122ec2793573f5db75123aa6909f361942 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Fri, 5 Sep 2025 13:21:44 +0530 Subject: [PATCH 3/9] lineage: added getter and setter function in context models --- .../atlas/discovery/AtlasLineageListContext.java | 12 ++++++++++++ .../discovery/AtlasLineageOnDemandContext.java | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java index 06a9291b237..837fffff885 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageListContext.java @@ -8,6 +8,8 @@ import org.apache.commons.collections.Predicate; import java.util.Set; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; + public final class AtlasLineageListContext { private String guid; private int size; @@ -23,6 +25,7 @@ public final class AtlasLineageListContext { private int currentEntityCounter; private boolean depthLimitReached; private boolean hasMoreUpdated; + private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; private Boolean immediateNeighbours; public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeRegistry typeRegistry) { @@ -36,6 +39,7 @@ public AtlasLineageListContext(LineageListRequest lineageListRequest, AtlasTypeR this.edgeTraversalPredicate = constructInMemoryPredicate(typeRegistry, lineageListRequest.getRelationshipTraversalFilters()); this.attributes = lineageListRequest.getAttributes(); this.relationAttributes = lineageListRequest.getRelationAttributes(); + this.lineageType = lineageListRequest.getLineageType(); this.immediateNeighbours = lineageListRequest.getImmediateNeighbours(); } @@ -131,6 +135,14 @@ public void setCurrentFromCounter(int currentFromCounter) { this.currentFromCounter = currentFromCounter; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public int getCurrentEntityCounter() { return currentEntityCounter; } diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java index 55096848550..f5c28306382 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasLineageOnDemandContext.java @@ -14,6 +14,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; + public class AtlasLineageOnDemandContext { private static final Logger LOG = LoggerFactory.getLogger(AtlasLineageContext.class); @@ -24,6 +26,8 @@ public class AtlasLineageOnDemandContext { private Set relationAttributes; private LineageOnDemandBaseParams defaultParams; + private String lineageType = LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; + public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest, AtlasTypeRegistry typeRegistry) { this.constraints = lineageOnDemandRequest.getConstraints(); this.attributes = lineageOnDemandRequest.getAttributes(); @@ -31,6 +35,7 @@ public AtlasLineageOnDemandContext(LineageOnDemandRequest lineageOnDemandRequest this.defaultParams = lineageOnDemandRequest.getDefaultParams(); this.vertexPredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getEntityTraversalFilters()); this.edgePredicate = constructInMemoryPredicate(typeRegistry, lineageOnDemandRequest.getRelationshipTraversalFilters()); + this.lineageType = lineageOnDemandRequest.getLineageType(); } public Map getConstraints() { @@ -57,6 +62,14 @@ public void setEdgePredicate(Predicate edgePredicate) { this.edgePredicate = edgePredicate; } + public String getLineageType() { + return lineageType; + } + + public void setLineageType(String lineageType) { + this.lineageType = lineageType; + } + public Set getAttributes() { return attributes; } From d6024181bd1f1b640297aecc06b063c6b8f34134 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 01:31:21 +0530 Subject: [PATCH 4/9] lineage: support multilevel Product lineage --- .../atlas/discovery/EntityLineageService.java | 193 +++++++++++++----- .../java/org/apache/atlas/RequestContext.java | 19 ++ 2 files changed, 161 insertions(+), 51 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 5d9472b1b53..ba72e95c2cb 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -71,6 +71,8 @@ import static org.apache.atlas.AtlasErrorCode.INSTANCE_LINEAGE_QUERY_FAILED; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection.*; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_DATASET_PROCESS_LINEAGE; +import static org.apache.atlas.model.lineage.LineageListRequest.LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE; import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; import static org.apache.atlas.repository.graphdb.AtlasEdgeDirection.IN; @@ -83,11 +85,15 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String OUTPUT_PORT_EDGE = "__Asset.outputPortDataProducts"; + private static final String INPUT_PORT_EDGE = "__Asset.inputPortDataProducts"; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; private static final int LINEAGE_ON_DEMAND_DEFAULT_DEPTH = 3; private static final String SEPARATOR = "->"; + public static final String IS_DATA_PRODUCT = "isDataProduct"; + public static final String IS_DATASET = "isDataset"; private final AtlasGraph graph; private final AtlasGremlinQueryProvider gremlinQueryProvider; @@ -96,6 +102,10 @@ public class EntityLineageService implements AtlasLineageService { private final VertexEdgeCache vertexEdgeCache; private static final List FETCH_ENTITY_ATTRIBUTES = Arrays.asList(ATTRIBUTE_NAME_GUID, QUALIFIED_NAME, NAME); + public static final HashMap LINEAGE_MAP = new HashMap(){{ + put(LINEAGE_TYPE_DATASET_PROCESS_LINEAGE, new String[]{PROCESS_INPUTS_EDGE, PROCESS_OUTPUTS_EDGE}); + put(LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE, new String[]{OUTPUT_PORT_EDGE, INPUT_PORT_EDGE}); + }}; @Inject EntityLineageService(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, VertexEdgeCache vertexEdgeCache, EntityGraphRetriever entityRetriever) { @@ -177,9 +187,12 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getAtlasLineageInfo"); RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageOnDemandRequest.getLineageType())[1]); + AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + EntityValidationResult entityValidationResult = validateAndGetEntityTypeMap(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -194,6 +207,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListRequest lineageListRequest) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageListInfoOnDemand"); + RequestContext.get().setLineageInputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[0]); + RequestContext.get().setLineageOutputLabel(LINEAGE_MAP.get(lineageListRequest.getLineageType())[1]); AtlasLineageListInfo ret = new AtlasLineageListInfo(new ArrayList<>()); RequestContext.get().setRelationAttrsForSearch(lineageListRequest.getRelationAttributes()); @@ -204,20 +219,59 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + public class EntityValidationResult { + public final boolean isProcess; + public final boolean isDataSet; + public final boolean isConnection; + public final boolean isConnectionProcess; + public final boolean isDataProduct; + + public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess, boolean isDataProduct) { + this.isProcess = isProcess; + this.isDataSet = isDataSet; + this.isConnection = isConnection; + this.isConnectionProcess = isConnectionProcess; + this.isDataProduct = isDataProduct; + } + + public boolean CheckIfConnectorVertex(String lineageType){ + + if(lineageType.equals(LINEAGE_TYPE_PRODUCT_ASSET_LINEAGE)){ + return !isDataProduct; + } + + if(isDataSet || isConnection){ + return false; + } + return true; + } + } + + private EntityValidationResult validateAndGetEntityTypeMap(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } + HashMap dataTypeMap = new HashMap<>(); boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isDataProduct = entityType.getTypeName().equals(DATA_PRODUCT_ENTITY_TYPE); + boolean isConnectionProcess = false; + boolean isDataSet = false; + boolean isConnection = false; if (!isProcess) { - boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); - if (!isDataSet) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE); + if(!isConnectionProcess){ + isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); + if (!isDataSet) { + isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE); + if(!isConnection){ + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + } + } } } - return !isProcess; + return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess, isDataProduct); } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -282,7 +336,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); @@ -299,7 +353,10 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - if (isDataSet) { + + boolean isConnectorVertex = entityValidationResult.CheckIfConnectorVertex(atlasLineageOnDemandContext.getLineageType()); + + if (!isConnectorVertex) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); @@ -332,6 +389,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); int nextLevel = isInput ? level - 1: level + 1; while (processEdges.hasNext()) { @@ -346,7 +404,7 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI continue; } - boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, new HashSet<>())) { break; } else { @@ -371,19 +429,21 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i if (depth != 0) { // base condition of recursion for depth AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand"); AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int nextLevel = isInput ? level - 1: level + 1; // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = GraphHelper.getActiveEdges(datasetVertex, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE, IN); + Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? lineageOutputLabel : lineageInputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { AtlasEdge incomingEdge = incomingEdges.next(); - AtlasVertex processVertex = incomingEdge.getOutVertex(); + AtlasVertex connectorVertex = incomingEdge.getOutVertex(); - if (!vertexMatchesEvaluation(processVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { + if (!vertexMatchesEvaluation(connectorVertex, atlasLineageOnDemandContext) || !edgeMatchesEvaluation(incomingEdge, atlasLineageOnDemandContext)) { continue; } @@ -404,7 +464,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = GraphHelper.getActiveEdges(processVertex, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); + Iterator outgoingEdges = GraphHelper.getActiveEdges(connectorVertex, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -415,11 +475,11 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i continue; } - if (checkForOffset(outgoingEdge, processVertex, atlasLineageOnDemandContext, ret)) { + if (checkForOffset(outgoingEdge, connectorVertex, atlasLineageOnDemandContext, ret)) { continue; } if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, entitiesTraversed, direction, visitedVertices)) { - String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); + String processGuid = AtlasGraphUtilsV2.getIdFromVertex(connectorVertex); LineageInfoOnDemand entityOnDemandInfo = ret.getRelationsOnDemand().get(processGuid); if (entityOnDemandInfo == null) continue; @@ -456,6 +516,7 @@ private static void setEntityLimitReachedFlag(boolean isInput, AtlasLineageOnDem private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext lineageListContext, AtlasLineageListInfo ret) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesUsingBFS"); + String lineageType = lineageListContext.getLineageType(); Set visitedVertices = new HashSet<>(); visitedVertices.add(baseGuid); Set skippedVertices = new HashSet<>(); @@ -466,11 +527,13 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); + EntityValidationResult entityValidationResult = validateAndGetEntityTypeMap(baseGuid); + + boolean isNotConnecterVertex = entityValidationResult.CheckIfConnectorVertex(lineageType); // Get the neighbors for the current node - enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); + enqueueNeighbours(baseVertex, entityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; - int currentLevel = isBaseNodeDataset? 0: 1; + int currentLevel = isNotConnecterVertex? 0: 1; if(lineageListContext.getImmediateNeighbours()){ // Add the current node and its neighbors to the result @@ -481,7 +544,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line currentDepth++; // update level at every alternate depth - if ((isBaseNodeDataset && currentDepth % 2 != 0) || (!isBaseNodeDataset && currentDepth % 2 == 0)) + if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0)) currentLevel++; int entitiesInCurrentDepth = traversalQueue.size(); @@ -494,20 +557,20 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + EntityValidationResult currentEntityValidationResult = validateAndGetEntityTypeMap(currentGUID); if (!lineageListContext.evaluateVertexFilter(currentVertex)) { - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); + enqueueNeighbours(currentVertex, currentEntityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } if (checkOffsetAndSkipEntity(lineageListContext, ret)) { skippedVertices.add(currentGUID); - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); + enqueueNeighbours(currentVertex, currentEntityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; } lineageListContext.incrementEntityCount(); // Get the neighbors for the current node - enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); + enqueueNeighbours(currentVertex, currentEntityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); // Add the current node and its neighbors to the result appendToResult(currentVertex, lineageListContext, ret, currentLevel); @@ -531,16 +594,21 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line RequestContext.get().endMetricRecord(metricRecorder); } - private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, + private void enqueueNeighbours(AtlasVertex currentVertex, EntityValidationResult entityValidationResult, AtlasLineageListContext lineageListContext, Queue traversalQueue, Set visitedVertices, Set skippedVertices, Map> lineageParentsForEntityMap, Map> lineageChildrenForEntityMap) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdges = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdges"); Iterator edges; - if (isDataset) - edges = GraphHelper.getActiveEdges(currentVertex, isInputDirection(lineageListContext) ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE, IN); + + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = lineageListContext.getLineageType(); + boolean isConnecterVertex = entityValidationResult.CheckIfConnectorVertex(lineageType); + if (!isConnecterVertex) + edges = currentVertex.getEdges(IN, isInputDirection(lineageListContext) ? lineageOutputLabel : lineageInputLabel).iterator(); else - edges = GraphHelper.getActiveEdges(currentVertex, isInputDirection(lineageListContext) ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); + edges = currentVertex.getEdges(OUT, isInputDirection(lineageListContext) ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdges); while (edges.hasNext()) { @@ -548,7 +616,7 @@ private void enqueueNeighbours(AtlasVertex currentVertex, boolean isDataset, if (!lineageListContext.evaluateTraversalFilter(currentEdge)) continue; AtlasVertex neighbourVertex; - if (isDataset) + if (!isConnecterVertex) neighbourVertex = currentEdge.getOutVertex(); else neighbourVertex = currentEdge.getInVertex(); @@ -743,19 +811,21 @@ else if (!isInput && ! isInVertexVisited) } private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex inVertex, LineageInfoOnDemand inLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(inVertex, IN, PROCESS_INPUTS_EDGE, atlasLineageOnDemandContext); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + List filteredEdges = getFilteredAtlasEdges(inVertex, IN, lineageInputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); - inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { - List filteredEdges = getFilteredAtlasEdges(outVertex, IN, PROCESS_OUTPUTS_EDGE, atlasLineageOnDemandContext); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List filteredEdges = getFilteredAtlasEdges(outVertex, IN, lineageOutputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); - outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { @@ -911,6 +981,8 @@ private AtlasLineageInfo getLineageInfo(AtlasLineageContext lineageContext, Line private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getLineageInfoV2"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); int depth = lineageContext.getDepth(); String guid = lineageContext.getGuid(); LineageDirection direction = lineageContext.getDirection(); @@ -937,7 +1009,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == INPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_INPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageInputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(INPUT, hasMoreChildren(qualifyingEdges))); @@ -952,7 +1024,7 @@ private AtlasLineageInfo getLineageInfoV2(AtlasLineageContext lineageContext) th } if (direction == OUTPUT || direction == BOTH) { - Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, PROCESS_OUTPUTS_EDGE).iterator(); + Iterator processEdges = vertexEdgeCache.getEdges(processVertex, OUT, lineageOutputLabel).iterator(); List qualifyingEdges = getQualifyingProcessEdges(processEdges, lineageContext); ret.setHasChildrenForDirection(getGuid(processVertex), new LineageChildrenInfo(OUTPUT, hasMoreChildren(qualifyingEdges))); @@ -1004,12 +1076,13 @@ private int getLineageMaxNodeAllowedCount() { private String getEdgeLabel(AtlasEdge edge) { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getEdgeLabel"); try { + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (isLineageOnDemandEnabled()) { return getEdgeLabelFromGuids(isInputEdge, inGuid, outGuid); @@ -1184,9 +1257,11 @@ private List> getUnvisitedProcessEdgesWithOutputVertexId return Collections.emptyList(); } + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); List> unvisitedProcessEdgesWithOutputVertexIds = new ArrayList<>(); - Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE); + Iterable outgoingEdges = vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel); for (AtlasEdge outgoingEdge : outgoingEdges) { AtlasVertex outputVertex = outgoingEdge.getInVertex(); @@ -1273,7 +1348,9 @@ private void addLimitlessVerticesToResult(boolean isInput, int depth, Set processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + List processEdges = vertexEdgeCache.getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel); // Filter lineages based on ignored process types processEdges = CollectionUtils.isNotEmpty(lineageContext.getIgnoredProcesses()) ? @@ -1294,8 +1371,10 @@ private void processLastLevel(AtlasVertex currentVertex, boolean isInput, AtlasL private boolean childHasOnlySelfCycle(AtlasVertex processVertex, AtlasVertex currentVertex, boolean isInput) { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("childHasSelfCycle"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); Iterator processEdgeIterator; - processEdgeIterator = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + processEdgeIterator = processVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); Set processOutputEdges = new HashSet<>(); while (processEdgeIterator.hasNext()) { processOutputEdges.add(processEdgeIterator.next()); @@ -1312,7 +1391,10 @@ private List getEdgesOfProcess(boolean isInput, AtlasLineageContext l return Collections.emptyList(); } - return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE) + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + + return vertexEdgeCache.getEdges(processVertex, OUT, isInput ? lineageInputLabel : lineageOutputLabel) .stream() .filter(edge -> shouldProcessEdge(lineageContext, edge) && vertexMatchesEvaluation(edge.getInVertex(), lineageContext)) .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) @@ -1352,8 +1434,10 @@ private boolean shouldProcessEdge(AtlasLineageContext lineageContext, AtlasEdge } private List getEdgesOfCurrentVertex(AtlasVertex currentVertex, boolean isInput, AtlasLineageContext lineageContext) { + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); return vertexEdgeCache - .getEdges(currentVertex, IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE) + .getEdges(currentVertex, IN, isInput ? lineageOutputLabel : lineageInputLabel) .stream() .sorted(Comparator.comparing(edge -> edge.getProperty("_r__guid", String.class))) .filter(edge -> shouldProcessEdge(lineageContext, edge)) @@ -1445,6 +1529,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = incomingEdge.getInVertex(); AtlasVertex outVertex = outgoingEdge.getInVertex(); AtlasVertex processVertex = outgoingEdge.getOutVertex(); @@ -1452,7 +1537,7 @@ private boolean processVirtualEdge(final AtlasEdge incomingEdge, final AtlasEdge String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String processGuid = AtlasGraphUtilsV2.getIdFromVertex(processVertex); String relationGuid = null; - boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(inVertex, lineageContext.getAttributes()); @@ -1484,6 +1569,7 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, AtlasLineageContext lineageContext) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("processEdges"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); @@ -1511,14 +1597,14 @@ private void processEdges(final AtlasEdge incomingEdge, AtlasEdge outgoingEdge, } String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(incomingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (incomingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (incomingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(leftGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, leftGuid, relationGuid)); } relationGuid = AtlasGraphUtilsV2.getEncodedProperty(outgoingEdge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - if (outgoingEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE)) { + if (outgoingEdge.getLabel().equalsIgnoreCase(lineageInputLabel)) { relations.add(new LineageRelation(rightGuid, processGuid, relationGuid)); } else { relations.add(new LineageRelation(processGuid, rightGuid, relationGuid)); @@ -1533,12 +1619,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, final Map entities = lineageInfo.getGuidEntityMap(); final Set relations = lineageInfo.getRelations(); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1561,12 +1648,13 @@ private void processEdge(final AtlasEdge edge, AtlasLineageInfo lineageInfo, private void processEdge(final AtlasEdge edge, final Map entities, final Set relations, AtlasLineageContext lineageContext) throws AtlasBaseException { //Backward compatibility method + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); AtlasVertex inVertex = edge.getInVertex(); AtlasVertex outVertex = edge.getOutVertex(); String inGuid = AtlasGraphUtilsV2.getIdFromVertex(inVertex); String outGuid = AtlasGraphUtilsV2.getIdFromVertex(outVertex); String relationGuid = AtlasGraphUtilsV2.getEncodedProperty(edge, RELATIONSHIP_GUID_PROPERTY_KEY, String.class); - boolean isInputEdge = edge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE); + boolean isInputEdge = edge.getLabel().equalsIgnoreCase(lineageInputLabel); if (!entities.containsKey(inGuid)) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(inVertex, lineageContext.getAttributes()); @@ -1594,6 +1682,7 @@ private void processEdge(final AtlasEdge edge, final Map> getRemovedElementsMap() { return removedElementsMap; } From aba0bfe06603d9e068ab83ead8938289e4f82207 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 02:29:33 +0530 Subject: [PATCH 5/9] lineage: corrected lineage ondemand logic --- .../org/apache/atlas/discovery/EntityLineageService.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index ba72e95c2cb..2f0c2d44b6d 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -338,6 +338,9 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); + String lineageInputLabel = RequestContext.get().getLineageInputLabel(); + String lineageOutputLabel = RequestContext.get().getLineageOutputLabel(); + String lineageType = atlasLineageOnDemandContext.getLineageType(); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); AtlasLineageOnDemandInfo.LineageDirection direction = lineageConstraintsByGuid.getDirection(); @@ -365,15 +368,15 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); - } else { + } else { AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageInputLabel).iterator(); traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, lineageOutputLabel).iterator(); traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); } } From 1c7375b5c1eb61d1de7dc3a6d65aaba366a44400 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 02:36:52 +0530 Subject: [PATCH 6/9] lineage: resolved error --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 2f0c2d44b6d..a216436ba1e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -467,7 +467,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = GraphHelper.getActiveEdges(connectorVertex, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE, OUT); + Iterator outgoingEdges = connectorVertex.getEdges(OUT, isInput ? lineageInputLabel : lineageOutputLabel).iterator(); RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { From 013983c86855115c035ec3224f21764fc61968fa Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 10:13:55 +0530 Subject: [PATCH 7/9] lineage: corrected variable --- .../org/apache/atlas/discovery/EntityLineageService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a216436ba1e..73874e70f9e 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -532,11 +532,11 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); EntityValidationResult entityValidationResult = validateAndGetEntityTypeMap(baseGuid); - boolean isNotConnecterVertex = entityValidationResult.CheckIfConnectorVertex(lineageType); + boolean isNotConnectorVertex = entityValidationResult.CheckIfConnectorVertex(lineageType); // Get the neighbors for the current node enqueueNeighbours(baseVertex, entityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; - int currentLevel = isNotConnecterVertex? 0: 1; + int currentLevel = isNotConnectorVertex? 0: 1; if(lineageListContext.getImmediateNeighbours()){ // Add the current node and its neighbors to the result @@ -547,7 +547,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line currentDepth++; // update level at every alternate depth - if ((isNotConnecterVertex && currentDepth % 2 != 0) || (!isNotConnecterVertex && currentDepth % 2 == 0)) + if ((isNotConnectorVertex && currentDepth % 2 != 0) || (!isNotConnectorVertex && currentDepth % 2 == 0)) currentLevel++; int entitiesInCurrentDepth = traversalQueue.size(); From 6dd916fb5b6e3a1c616e32b0498b129e98fd0eb1 Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 12:14:21 +0530 Subject: [PATCH 8/9] lineage: resolved logical error in downstream fuc --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 73874e70f9e..2ccac4f5078 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -818,8 +818,8 @@ private void setHasDownstream(AtlasLineageOnDemandContext atlasLineageOnDemandCo List filteredEdges = getFilteredAtlasEdges(inVertex, IN, lineageInputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { inLineageInfo.setHasDownstream(true); + inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } - inLineageInfo.setTotalOutputRelationsCount(filteredEdges.size()); } private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasVertex outVertex, LineageInfoOnDemand outLineageInfo) { @@ -827,8 +827,8 @@ private void setHasUpstream(AtlasLineageOnDemandContext atlasLineageOnDemandCont List filteredEdges = getFilteredAtlasEdges(outVertex, IN, lineageOutputLabel, atlasLineageOnDemandContext); if (!filteredEdges.isEmpty()) { outLineageInfo.setHasUpstream(true); + outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } - outLineageInfo.setTotalInputRelationsCount(filteredEdges.size()); } private List getFilteredAtlasEdges(AtlasVertex outVertex, AtlasEdgeDirection direction, String processEdgeLabel, AtlasLineageOnDemandContext atlasLineageOnDemandContext) { From 88a738b3f375c7111cc82aa2ca3d794576428f3b Mon Sep 17 00:00:00 2001 From: PRATHAM2002-DS Date: Mon, 8 Sep 2025 16:48:48 +0530 Subject: [PATCH 9/9] lineage: correct fetch connector vertex --- .../java/org/apache/atlas/discovery/EntityLineageService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index 2ccac4f5078..d234162a67a 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -532,7 +532,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); EntityValidationResult entityValidationResult = validateAndGetEntityTypeMap(baseGuid); - boolean isNotConnectorVertex = entityValidationResult.CheckIfConnectorVertex(lineageType); + boolean isNotConnectorVertex = !entityValidationResult.CheckIfConnectorVertex(lineageType); // Get the neighbors for the current node enqueueNeighbours(baseVertex, entityValidationResult, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0;