-
Notifications
You must be signed in to change notification settings - Fork 9
MLH-1226 track denorm attributes and add to kafka #5392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
431627f
4aa11fb
d71bd78
637711d
7ba46f8
e75eff6
a6faa17
ab75f80
478882b
945da59
3ba9eca
c20b9f4
e44618d
6a010e7
e048edb
f918012
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ on: | |
- beta | ||
- master | ||
- staging | ||
- mlh-1226-denorm-attrs-push-kafka | ||
|
||
jobs: | ||
build: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,14 @@ | |
|
||
import java.util.*; | ||
|
||
import org.apache.atlas.RequestContext; | ||
import org.apache.atlas.repository.graphdb.AtlasEdge; | ||
import org.apache.atlas.repository.graphdb.AtlasElement; | ||
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; | ||
import org.apache.atlas.repository.graphdb.AtlasVertex; | ||
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONMode; | ||
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONUtility; | ||
import org.apache.commons.lang.StringUtils; | ||
import org.apache.tinkerpop.gremlin.structure.Element; | ||
import org.apache.tinkerpop.gremlin.structure.Property; | ||
import org.codehaus.jettison.json.JSONException; | ||
|
@@ -35,6 +37,9 @@ | |
import org.janusgraph.core.SchemaViolationException; | ||
import org.janusgraph.core.JanusGraphElement; | ||
|
||
import static org.apache.atlas.type.Constants.GUID_PROPERTY_KEY; | ||
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX; | ||
|
||
/** | ||
* Janus implementation of AtlasElement. | ||
* | ||
|
@@ -109,36 +114,49 @@ public void removeProperty(String propertyName) { | |
while(it.hasNext()) { | ||
Property<String> property = it.next(); | ||
property.remove(); | ||
recordInternalAttribute(propertyName, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
} | ||
|
||
@Override | ||
public void removePropertyValue(String propertyName, Object propertyValue) { | ||
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName); | ||
List<Object> finalValues = new ArrayList<>(); | ||
boolean removedFirst = false; | ||
|
||
while (it.hasNext()) { | ||
Property currentProperty = it.next(); | ||
Object currentPropertyValue = currentProperty.value(); | ||
|
||
if (Objects.equals(currentPropertyValue, propertyValue)) { | ||
if (!removedFirst && Objects.equals(currentPropertyValue, propertyValue)) { | ||
currentProperty.remove(); | ||
break; | ||
removedFirst = true; | ||
} else { | ||
finalValues.add(currentPropertyValue); | ||
} | ||
} | ||
|
||
recordInternalAttribute(propertyName, finalValues); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Property Removal Causes Incorrect Context UpdatesThe |
||
} | ||
|
||
@Override | ||
public void removeAllPropertyValue(String propertyName, Object propertyValue) { | ||
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName); | ||
List<Object> finalValues = new ArrayList<>(); | ||
|
||
|
||
while (it.hasNext()) { | ||
Property currentProperty = it.next(); | ||
Object currentPropertyValue = currentProperty.value(); | ||
|
||
if (Objects.equals(currentPropertyValue, propertyValue)) { | ||
currentProperty.remove(); | ||
} else { | ||
finalValues.add(currentPropertyValue); | ||
} | ||
} | ||
|
||
recordInternalAttribute(propertyName, finalValues); | ||
} | ||
|
||
@Override | ||
|
@@ -151,6 +169,7 @@ public void setProperty(String propertyName, Object value) { | |
} | ||
} else { | ||
getWrappedElement().property(propertyName, value); | ||
recordInternalAttribute(propertyName, value); | ||
} | ||
} catch(SchemaViolationException e) { | ||
throw new AtlasSchemaViolationException(e); | ||
|
@@ -308,7 +327,6 @@ public void setPropertyFromElementsIds(String propertyName, List<AtlasElement> v | |
@Override | ||
public void setPropertyFromElementId(String propertyName, AtlasElement value) { | ||
setProperty(propertyName, value.getId().toString()); | ||
|
||
} | ||
|
||
|
||
|
@@ -317,4 +335,20 @@ public boolean isIdAssigned() { | |
return true; | ||
} | ||
|
||
private void recordInternalAttribute(String propertyName, Object finalValue) { | ||
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) { | ||
RequestContext context = RequestContext.get(); | ||
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class); | ||
|
||
if (StringUtils.isNotEmpty(entityGuid)) { | ||
if (context.getAllInternalAttributesMap().get(entityGuid) != null) { | ||
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, finalValue); | ||
} else { | ||
Map<String, Object> map = new HashMap<>(); | ||
map.put(propertyName, finalValue); | ||
context.getAllInternalAttributesMap().put(entityGuid, map); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,6 +307,16 @@ private AtlasEntityHeaderWithRelations toNotificationHeader(AtlasEntity entity) | |
} | ||
} | ||
|
||
// fill the internal properties like __glossary, __meanings etc. (ES Isolation) | ||
RequestContext context = RequestContext.get(); | ||
Map<String, Object> allInternalAttributesMap = context.getAllInternalAttributesMap().get(entity.getGuid()); | ||
if (MapUtils.isNotEmpty(allInternalAttributesMap)) { | ||
for (String key : allInternalAttributesMap.keySet()) { | ||
Object value = allInternalAttributesMap.get(key); | ||
ret.setAttribute(key, value); | ||
|
||
} | ||
} | ||
|
||
//Add relationship attributes which has isOptional as false | ||
Map<String, Object> rel = new HashMap<>(); | ||
for (Map<String, AtlasAttribute> attrs : entityType.getRelationshipAttributes().values()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: CI Workflow Contains Hardcoded Feature Branch Trigger
The CI workflow includes a hardcoded trigger for the
mlh-1226-denorm-attrs-push-kafka
feature branch. This appears to be temporary testing configuration that was committed inadvertently, as feature branch names are typically not hardcoded in shared CI workflows.