Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ on:
- beta
- master
- staging
- mlh-1226-denorm-attrs-push-kafka
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: CI Workflow Contains Hardcoded Feature Branch Trigger

The CI workflow includes a hardcoded trigger for the mlh-1226-denorm-attrs-push-kafka feature branch. This appears to be temporary testing configuration that was committed inadvertently, as feature branch names are typically not hardcoded in shared CI workflows.

Fix in Cursor Fix in Web


jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import java.util.*;

import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONMode;
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONUtility;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.codehaus.jettison.json.JSONException;
Expand All @@ -35,6 +37,9 @@
import org.janusgraph.core.SchemaViolationException;
import org.janusgraph.core.JanusGraphElement;

import static org.apache.atlas.type.Constants.GUID_PROPERTY_KEY;
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;

/**
* Janus implementation of AtlasElement.
*
Expand Down Expand Up @@ -105,17 +110,28 @@ public Set<String> getPropertyKeys() {

@Override
public void removeProperty(String propertyName) {
RequestContext context = RequestContext.get();
Iterator<? extends Property<String>> it = getWrappedElement().properties(propertyName);
while(it.hasNext()) {
Property<String> property = it.next();
property.remove();
//fill the map for the entityGuid and remove value
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will collect attrs like __createdBy, __createdAt & other attributes which not denorm attrs. shall we exclude them as well?
or we can even have inclusion list but we will loose future readyness & have to update the inclusion list everytime we introduce new denorm attribute

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do this change after i test this in an environment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class);
if (StringUtils.isNotEmpty(entityGuid)) {
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, null);
}
}
}
}

@Override
public void removePropertyValue(String propertyName, Object propertyValue) {
RequestContext context = RequestContext.get();
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName);

// we create a map of properties that are only internal
fillInternalPropertiesIfApplicable(propertyName, propertyValue, it, context);
it = getWrappedElement().properties(propertyName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea to avoid iterating twice

while (it.hasNext()) {
Property currentProperty = it.next();
Object currentPropertyValue = currentProperty.value();
Expand All @@ -130,7 +146,8 @@ public void removePropertyValue(String propertyName, Object propertyValue) {
@Override
public void removeAllPropertyValue(String propertyName, Object propertyValue) {
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName);

RequestContext context = RequestContext.get();
fillInternalPropertiesIfApplicable(propertyName, propertyValue, it, context);
while (it.hasNext()) {
Property currentProperty = it.next();
Object currentPropertyValue = currentProperty.value();
Expand All @@ -143,6 +160,7 @@ public void removeAllPropertyValue(String propertyName, Object propertyValue) {

@Override
public void setProperty(String propertyName, Object value) {
RequestContext context = RequestContext.get();
try {
if (value == null) {
Object existingVal = getProperty(propertyName, Object.class);
Expand All @@ -151,6 +169,19 @@ public void setProperty(String propertyName, Object value) {
}
} else {
getWrappedElement().property(propertyName, value);
//fill all internal properties for the entityGuid
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) {
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class);
if (StringUtils.isNotEmpty(entityGuid)) {
if (context.getAllInternalAttributesMap().get(entityGuid) != null) {
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, value);
} else {
Map<String, Object> map = new HashMap<>();
map.put(propertyName, value);
context.getAllInternalAttributesMap().put(entityGuid, map);
}
}
}
}
} catch(SchemaViolationException e) {
throw new AtlasSchemaViolationException(e);
Expand Down Expand Up @@ -317,4 +348,31 @@ public boolean isIdAssigned() {
return true;
}


private void fillInternalPropertiesIfApplicable(String propertyName, Object propertyValue, Iterator<? extends Property<Object>> it, RequestContext context) {
while (it.hasNext()) {
Property currentProperty = it.next();
Object currentPropertyValue = currentProperty.value();

if (!Objects.equals(currentPropertyValue, propertyValue)) {
//fill the map with all iterartor values for the entityGuid
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) {
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class);
if (StringUtils.isNotEmpty(entityGuid)) {
if (context.getAllInternalAttributesMap().get(entityGuid) != null) {
Object value = context.getAllInternalAttributesMap().get(entityGuid);
List<Object> values = new ArrayList<>();
values.add(value);
values.add(currentPropertyValue);
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, values);
} else {
Map<String, Object> map = new HashMap<>();
map.put(propertyName, currentPropertyValue);
context.getAllInternalAttributesMap().put(entityGuid, map);
}
}
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unused Debugging Method Found

The recordInternalAttributeIncremental2 method is unused. It appears to be debugging or experimental code that was accidentally committed, especially as it serves a similar purpose to recordInternalAttributeIncrementalAdd.

Fix in Cursor Fix in Web

}
6 changes: 6 additions & 0 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class RequestContext {
private final Map<String, AtlasEntityHeader> updatedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> deletedEntities = new HashMap<>();
private final Map<String, AtlasEntityHeader> restoreEntities = new HashMap<>();
private final Map<String, Map<String, Object>> allInternalAttributesMap = new HashMap<>();


private Map<String, String> lexoRankCache = null;
Expand Down Expand Up @@ -200,6 +201,7 @@ public void clearCache() {
addedClassificationAndVertices.clear();
esDeferredOperations.clear();
this.cassandraTagOperations.clear();
this.allInternalAttributesMap.clear();

if (metrics != null && !metrics.isEmpty()) {
METRICS.debug(metrics.toString());
Expand Down Expand Up @@ -877,6 +879,10 @@ public boolean isInvokedByLineage() {
return isInvokedByLineage;
}

public Map<String, Map<String, Object>> getAllInternalAttributesMap() {
return allInternalAttributesMap;
}

public class EntityGuidPair {
private final Object entity;
private final String guid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ private AtlasEntityHeaderWithRelations toNotificationHeader(AtlasEntity entity)
}
}

// fill the internal properties like __glossary, __meanings etc. (ES Isolation)
RequestContext context = RequestContext.get();
Map<String, Object> allInternalAttributesMap = context.getAllInternalAttributesMap().get(entity.getGuid());
if (MapUtils.isNotEmpty(allInternalAttributesMap)) {
for (String key : allInternalAttributesMap.keySet()) {
Object value = allInternalAttributesMap.get(key);
ret.setAttribute(key, value);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will put internal attributes in attributes like attributes.__meanings. Nothing wrong in this just wanted to highlight

}
}

//Add relationship attributes which has isOptional as false
Map<String, Object> rel = new HashMap<>();
for (Map<String, AtlasAttribute> attrs : entityType.getRelationshipAttributes().values()) {
Expand Down
Loading