-
Notifications
You must be signed in to change notification settings - Fork 9
MLH-1226 track denorm attributes and add to kafka #5392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from 2 commits
431627f
4aa11fb
d71bd78
637711d
7ba46f8
e75eff6
a6faa17
c20b9f4
e44618d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ on: | |
- beta | ||
- master | ||
- staging | ||
- mlh-1226-denorm-attrs-push-kafka | ||
|
||
jobs: | ||
build: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,14 @@ | |
|
||
import java.util.*; | ||
|
||
import org.apache.atlas.RequestContext; | ||
import org.apache.atlas.repository.graphdb.AtlasEdge; | ||
import org.apache.atlas.repository.graphdb.AtlasElement; | ||
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; | ||
import org.apache.atlas.repository.graphdb.AtlasVertex; | ||
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONMode; | ||
import org.apache.atlas.repository.graphdb.janus.graphson.AtlasGraphSONUtility; | ||
import org.apache.commons.lang.StringUtils; | ||
import org.apache.tinkerpop.gremlin.structure.Element; | ||
import org.apache.tinkerpop.gremlin.structure.Property; | ||
import org.codehaus.jettison.json.JSONException; | ||
|
@@ -35,6 +37,9 @@ | |
import org.janusgraph.core.SchemaViolationException; | ||
import org.janusgraph.core.JanusGraphElement; | ||
|
||
import static org.apache.atlas.type.Constants.GUID_PROPERTY_KEY; | ||
import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX; | ||
|
||
/** | ||
* Janus implementation of AtlasElement. | ||
* | ||
|
@@ -105,17 +110,28 @@ public Set<String> getPropertyKeys() { | |
|
||
@Override | ||
public void removeProperty(String propertyName) { | ||
RequestContext context = RequestContext.get(); | ||
sriram-atlan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
Iterator<? extends Property<String>> it = getWrappedElement().properties(propertyName); | ||
while(it.hasNext()) { | ||
Property<String> property = it.next(); | ||
property.remove(); | ||
//fill the map for the entityGuid and remove value | ||
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) { | ||
|
||
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class); | ||
if (StringUtils.isNotEmpty(entityGuid)) { | ||
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, null); | ||
} | ||
} | ||
nikhilbonte21 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
@Override | ||
public void removePropertyValue(String propertyName, Object propertyValue) { | ||
RequestContext context = RequestContext.get(); | ||
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName); | ||
|
||
// we create a map of properties that are only internal | ||
fillInternalPropertiesIfApplicable(propertyName, propertyValue, it, context); | ||
it = getWrappedElement().properties(propertyName); | ||
|
||
while (it.hasNext()) { | ||
Property currentProperty = it.next(); | ||
Object currentPropertyValue = currentProperty.value(); | ||
|
@@ -130,7 +146,8 @@ public void removePropertyValue(String propertyName, Object propertyValue) { | |
@Override | ||
public void removeAllPropertyValue(String propertyName, Object propertyValue) { | ||
Iterator<? extends Property<Object>> it = getWrappedElement().properties(propertyName); | ||
|
||
RequestContext context = RequestContext.get(); | ||
fillInternalPropertiesIfApplicable(propertyName, propertyValue, it, context); | ||
while (it.hasNext()) { | ||
Property currentProperty = it.next(); | ||
Object currentPropertyValue = currentProperty.value(); | ||
|
@@ -143,6 +160,7 @@ public void removeAllPropertyValue(String propertyName, Object propertyValue) { | |
|
||
@Override | ||
public void setProperty(String propertyName, Object value) { | ||
RequestContext context = RequestContext.get(); | ||
try { | ||
if (value == null) { | ||
Object existingVal = getProperty(propertyName, Object.class); | ||
|
@@ -151,6 +169,19 @@ public void setProperty(String propertyName, Object value) { | |
} | ||
} else { | ||
getWrappedElement().property(propertyName, value); | ||
//fill all internal properties for the entityGuid | ||
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) { | ||
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class); | ||
if (StringUtils.isNotEmpty(entityGuid)) { | ||
if (context.getAllInternalAttributesMap().get(entityGuid) != null) { | ||
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, value); | ||
} else { | ||
Map<String, Object> map = new HashMap<>(); | ||
map.put(propertyName, value); | ||
context.getAllInternalAttributesMap().put(entityGuid, map); | ||
} | ||
} | ||
} | ||
} | ||
} catch(SchemaViolationException e) { | ||
throw new AtlasSchemaViolationException(e); | ||
|
@@ -317,4 +348,31 @@ public boolean isIdAssigned() { | |
return true; | ||
} | ||
|
||
|
||
private void fillInternalPropertiesIfApplicable(String propertyName, Object propertyValue, Iterator<? extends Property<Object>> it, RequestContext context) { | ||
while (it.hasNext()) { | ||
Property currentProperty = it.next(); | ||
Object currentPropertyValue = currentProperty.value(); | ||
|
||
if (!Objects.equals(currentPropertyValue, propertyValue)) { | ||
//fill the map with all iterartor values for the entityGuid | ||
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) { | ||
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class); | ||
if (StringUtils.isNotEmpty(entityGuid)) { | ||
if (context.getAllInternalAttributesMap().get(entityGuid) != null) { | ||
Object value = context.getAllInternalAttributesMap().get(entityGuid); | ||
List<Object> values = new ArrayList<>(); | ||
values.add(value); | ||
values.add(currentPropertyValue); | ||
context.getAllInternalAttributesMap().get(entityGuid).put(propertyName, values); | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} else { | ||
Map<String, Object> map = new HashMap<>(); | ||
map.put(propertyName, currentPropertyValue); | ||
context.getAllInternalAttributesMap().put(entityGuid, map); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,6 +307,16 @@ private AtlasEntityHeaderWithRelations toNotificationHeader(AtlasEntity entity) | |
} | ||
} | ||
|
||
// fill the internal properties like __glossary, __meanings etc. (ES Isolation) | ||
RequestContext context = RequestContext.get(); | ||
Map<String, Object> allInternalAttributesMap = context.getAllInternalAttributesMap().get(entity.getGuid()); | ||
if (MapUtils.isNotEmpty(allInternalAttributesMap)) { | ||
for (String key : allInternalAttributesMap.keySet()) { | ||
Object value = allInternalAttributesMap.get(key); | ||
ret.setAttribute(key, value); | ||
|
||
} | ||
} | ||
|
||
//Add relationship attributes which has isOptional as false | ||
Map<String, Object> rel = new HashMap<>(); | ||
for (Map<String, AtlasAttribute> attrs : entityType.getRelationshipAttributes().values()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: CI Workflow Contains Hardcoded Feature Branch Trigger
The CI workflow includes a hardcoded trigger for the
mlh-1226-denorm-attrs-push-kafka
feature branch. This appears to be temporary testing configuration that was committed inadvertently, as feature branch names are typically not hardcoded in shared CI workflows.