Skip to content

Commit 96d902c

Browse files
authored
Merge branch 'staging' into mlh-1240-improve-cm-refresh
2 parents 03a8e02 + 5ebe0c3 commit 96d902c

File tree

14 files changed

+464
-16
lines changed

14 files changed

+464
-16
lines changed

.github/workflows/maven.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ on:
2525
- beta
2626
- master
2727
- staging
28-
- mlh-1226-denorm-attrs-push-kafka
29-
- mlh-1240-improve-cm-refresh
3028

3129
jobs:
3230
build:

auth-agents-common/src/main/java/org/apache/atlas/policytransformer/CachePolicyTransformerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,9 @@ private List<RangerPolicyDelta> getRangerPolicyDelta(AtlasEntityHeader service,
387387
List<RangerPolicyDelta> deletedPolicyDeltas = new ArrayList<>();
388388
for (String policyGuid : policyGuids) {
389389
Integer deltaChangeType = auditEventToDeltaChangeType.get(policyChanges.get(policyGuid));
390+
if (deltaChangeType == null) {
391+
continue;
392+
}
390393
if (deltaChangeType == RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
391394
RangerPolicy deletedPolicy = new RangerPolicy();
392395
deletedPolicy.setGuid(policyGuid);
@@ -415,6 +418,10 @@ public Map<String, EntityAuditActionV2> createPolicyChangeMap(String serviceName
415418
Map<String, EntityAuditActionV2> policyChanges = new HashMap<>();
416419
for (EntityAuditEventV2 event : events) {
417420
if (POLICY_ENTITY_TYPE.equals(event.getTypeName()) && !policyChanges.containsKey(event.getEntityId())) {
421+
if (auditEventToDeltaChangeType.get(event.getAction()) == null) {
422+
LOG.warn("PolicyDelta: {}: No delta type found for audit_event={} guid={}", serviceName, event.getAction(), event.getEntityId());
423+
continue;
424+
}
418425
policyChanges.put(event.getEntityId(), event.getAction());
419426
}
420427
}

auth-audits/src/main/java/org/apache/atlas/audit/destination/Log4JAuditDestination.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class Log4JAuditDestination extends AuditDestination {
4646
private static final String AUTH_AUDIT_RESOURCE= "resource";
4747
private static final String AUTH_AUDIT_CLIENT_IP = "cliIP";
4848
private static final String AUTH_AUDIT_AGENT = "agent";
49+
private static final String AUTH_AUDIT_ENFORCER = "enforcer";
4950

5051

5152
public Log4JAuditDestination() {
@@ -105,6 +106,7 @@ private void recordLogAttributes(AuditEventBase eventBase) {
105106
MDC.put(AUTH_AUDIT_RESULT, String.valueOf(event.getAccessResult()));
106107
MDC.put(AUTH_AUDIT_CLIENT_IP, event.getClientIP());
107108
MDC.put(AUTH_AUDIT_AGENT, event.getAgentId());
109+
MDC.put(AUTH_AUDIT_ENFORCER, event.getAclEnforcer());
108110
}
109111
}
110112

@@ -118,6 +120,7 @@ private void clearLogAttributes(AuditEventBase event) {
118120
MDC.remove(AUTH_AUDIT_RESULT);
119121
MDC.remove(AUTH_AUDIT_CLIENT_IP);
120122
MDC.remove(AUTH_AUDIT_AGENT);
123+
MDC.remove(AUTH_AUDIT_ENFORCER);
121124
}
122125
}
123126

graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusElement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ protected void recordInternalAttribute(String propertyName, Object finalValue) {
352352
}
353353
}
354354

355-
protected void recordInternalAttributeIncrementalAdd(String propertyName, Object value, Class cardinality) {
355+
protected void recordInternalAttributeIncrementalAdd(String propertyName, Class cardinality) {
356356
if (propertyName.startsWith(INTERNAL_PROPERTY_KEY_PREFIX)) {
357357
String entityGuid = this.getProperty(GUID_PROPERTY_KEY, String.class);
358358

graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public AtlasJanusVertex(AtlasJanusGraph graph, Vertex source) {
5656
public <T> void addProperty(String propertyName, T value) {
5757
try {
5858
getWrappedElement().property(VertexProperty.Cardinality.set, propertyName, value);
59-
recordInternalAttributeIncrementalAdd(propertyName, value, Set.class);
59+
recordInternalAttributeIncrementalAdd(propertyName, Set.class);
6060
} catch(SchemaViolationException e) {
6161
throw new AtlasSchemaViolationException(e);
6262
}
@@ -66,7 +66,7 @@ public <T> void addProperty(String propertyName, T value) {
6666
public <T> void addListProperty(String propertyName, T value) {
6767
try {
6868
getWrappedElement().property(VertexProperty.Cardinality.list, propertyName, value);
69-
recordInternalAttributeIncrementalAdd(propertyName, value, List.class);
69+
recordInternalAttributeIncrementalAdd(propertyName, List.class);
7070
} catch(SchemaViolationException e) {
7171
throw new AtlasSchemaViolationException(e);
7272
}

intg/src/main/java/org/apache/atlas/model/typedef/AtlasStructDef.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ public void setOptions(Map<String, String> options) {
557557
public boolean isSoftReferenced() {
558558
return this.options != null &&
559559
getOptions().containsKey(AtlasAttributeDef.ATTRDEF_OPTION_SOFT_REFERENCE) &&
560-
getOptions().get(AtlasAttributeDef.ATTRDEF_OPTION_SOFT_REFERENCE).equals(STRING_TRUE);
560+
STRING_TRUE.equalsIgnoreCase(getOptions().get(AtlasAttributeDef.ATTRDEF_OPTION_SOFT_REFERENCE));
561561
}
562562

563563
@JsonIgnore

repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,13 +1841,11 @@ private void updateAssetHasLineageStatus(AtlasVertex assetVertex, AtlasEdge curr
18411841
}
18421842

18431843
}
1844-
private void updateAssetHasLineageStatusV1(AtlasVertex assetVertex, AtlasEdge currentEdge, Collection<AtlasEdge> removedEdges) {
1845-
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatusV1");
18461844

1847-
removedEdges.forEach(edge -> RequestContext.get().addToDeletedEdgesIdsForResetHasLineage(edge.getIdForDisplay()));
1845+
private boolean updateAssetHasLineageStatusWithDirection(AtlasVertex assetVertex, AtlasEdge currentEdge, AtlasEdgeDirection direction, Set<String> exclusionList) {
18481846

18491847
Iterator<AtlasEdge> edgeIterator = assetVertex.query()
1850-
.direction(AtlasEdgeDirection.BOTH)
1848+
.direction(direction)
18511849
.label(PROCESS_EDGE_LABELS)
18521850
.has(STATE_PROPERTY_KEY, ACTIVE.name())
18531851
.edges()
@@ -1857,7 +1855,7 @@ private void updateAssetHasLineageStatusV1(AtlasVertex assetVertex, AtlasEdge cu
18571855

18581856
while (edgeIterator.hasNext()) {
18591857
AtlasEdge edge = edgeIterator.next();
1860-
if (!RequestContext.get().getDeletedEdgesIdsForResetHasLineage().contains(edge.getIdForDisplay()) && !currentEdge.equals(edge)) {
1858+
if (!exclusionList.contains(edge.getIdForDisplay()) && !currentEdge.equals(edge)) {
18611859
AtlasVertex relatedProcessVertex = edge.getOutVertex();
18621860
boolean processHasLineage = getEntityHasLineage(relatedProcessVertex);
18631861
if (processHasLineage) {
@@ -1866,8 +1864,41 @@ private void updateAssetHasLineageStatusV1(AtlasVertex assetVertex, AtlasEdge cu
18661864
}
18671865
}
18681866
}
1867+
return processHasLineageCount > 0;
1868+
}
18691869

1870-
if (processHasLineageCount == 0) {
1870+
private boolean updateAssetHasLineageStatusWithOUTDirection(AtlasVertex assetVertex, AtlasEdge currentEdge, Set<String> exclusionList) {
1871+
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatusWithOUTDirection");
1872+
boolean hasLineage = updateAssetHasLineageStatusWithDirection(assetVertex, currentEdge, AtlasEdgeDirection.OUT, exclusionList);
1873+
RequestContext.get().endMetricRecord(metricRecorder);
1874+
return hasLineage;
1875+
}
1876+
1877+
private boolean updateAssetHasLineageStatusWithINDirection(AtlasVertex assetVertex, AtlasEdge currentEdge, Set<String> exclusionList) {
1878+
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatusWithINDirection");
1879+
boolean hasLineage = updateAssetHasLineageStatusWithDirection(assetVertex, currentEdge, AtlasEdgeDirection.IN, exclusionList);
1880+
RequestContext.get().endMetricRecord(metricRecorder);
1881+
return hasLineage;
1882+
}
1883+
1884+
private void updateAssetHasLineageStatusV1(AtlasVertex assetVertex, AtlasEdge currentEdge, Collection<AtlasEdge> removedEdges) {
1885+
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("updateAssetHasLineageStatusV1");
1886+
1887+
// Add removed edges to the context
1888+
removedEdges.forEach(edge -> RequestContext.get().addToDeletedEdgesIdsForResetHasLineage(edge.getIdForDisplay()));
1889+
Set<String> exclusionList = RequestContext.get().getDeletedEdgesIdsForResetHasLineage();
1890+
1891+
// First check in OUT direction
1892+
boolean hasActiveLineage = updateAssetHasLineageStatusWithOUTDirection(assetVertex, currentEdge, exclusionList);
1893+
1894+
// If no active lineage found in OUT direction, check IN direction
1895+
if (!hasActiveLineage) {
1896+
hasActiveLineage = updateAssetHasLineageStatusWithINDirection(assetVertex, currentEdge, exclusionList);
1897+
}
1898+
1899+
if (hasActiveLineage) {
1900+
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, true);
1901+
} else {
18711902
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, false);
18721903
}
18731904

repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public EntityMutationResponse mapAttributesAndClassifications(EntityMutationCont
377377

378378
setCustomAttributes(vertex, createdEntity);
379379
setSystemAttributesToEntity(vertex, createdEntity);
380-
resp.addEntity(CREATE, constructHeader(createdEntity, vertex, entityType.getAllAttributes()));
380+
resp.addEntity(CREATE, constructHeader(createdEntity, vertex, entityType));
381381

382382
if (bulkRequestContext.isAppendTags()) {
383383
if (CollectionUtils.isNotEmpty(createdEntity.getAddOrUpdateClassifications())) {
@@ -489,7 +489,7 @@ public EntityMutationResponse mapAttributesAndClassifications(EntityMutationCont
489489
}
490490

491491
setSystemAttributesToEntity(vertex, updatedEntity);
492-
resp.addEntity(updateType, constructHeader(updatedEntity, vertex, entityType.getAllAttributes()));
492+
resp.addEntity(updateType, constructHeader(updatedEntity, vertex, entityType));
493493

494494
// Add hasLineage for newly created edges
495495
Set<AtlasEdge> newlyCreatedEdges = getNewCreatedInputOutputEdges(guid);
@@ -3621,7 +3621,8 @@ private Set<AtlasEdge> getRemovedInputOutputEdges(String guid) {
36213621
}
36223622

36233623

3624-
private AtlasEntityHeader constructHeader(AtlasEntity entity, AtlasVertex vertex, Map<String, AtlasAttribute> attributeMap ) throws AtlasBaseException {
3624+
private AtlasEntityHeader constructHeader(AtlasEntity entity, AtlasVertex vertex, AtlasEntityType entityType) throws AtlasBaseException {
3625+
Map<String, AtlasAttribute> attributeMap = entityType.getAllAttributes();
36253626
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex, attributeMap.keySet());
36263627
if (entity.getClassifications() == null) {
36273628
entity.setClassifications(header.getClassifications());
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.apache.atlas.repository.store.graph.v2.repair;
2+
3+
import org.apache.atlas.exception.AtlasBaseException;
4+
import org.apache.atlas.AtlasErrorCode;
5+
import org.apache.commons.collections.CollectionUtils;
6+
import org.apache.commons.lang3.StringUtils;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.stereotype.Component;
10+
11+
import javax.inject.Inject;
12+
import java.util.Set;
13+
14+
@Component
15+
public class AtlasRepairAttributeService {
16+
17+
private static final Logger LOG = LoggerFactory.getLogger(AtlasRepairAttributeService.class);
18+
19+
private final RepairAttributeFactory repairAttributeFactory;
20+
21+
@Inject
22+
public AtlasRepairAttributeService(RepairAttributeFactory repairAttributeFactory) {
23+
this.repairAttributeFactory = repairAttributeFactory;
24+
}
25+
26+
public void repairAttributes(String attributeName, String repairType, Set<String> entityGuids)
27+
throws AtlasBaseException {
28+
29+
validateRequest(attributeName, repairType, entityGuids);
30+
31+
LOG.info("Starting attribute repair - Type: {}, Attribute: {}, Entities: {}",
32+
repairType, attributeName, entityGuids.size());
33+
34+
try {
35+
AtlasRepairAttributeStrategy strategy = repairAttributeFactory.getStrategy(repairType, entityGuids);
36+
37+
strategy.validate(entityGuids, attributeName);
38+
strategy.repair(entityGuids, attributeName);
39+
40+
LOG.info("Successfully completed attribute repair - Type: {}, Attribute: {}, Entities: {}",
41+
repairType, attributeName, entityGuids.size());
42+
43+
} catch (Exception e) {
44+
LOG.error("Error during attribute repair - Type: {}, Attribute: {}, Entities: {}",
45+
repairType, attributeName, entityGuids.size(), e);
46+
throw e;
47+
}
48+
}
49+
50+
private void validateRequest(String attributeName, String repairType, Set<String> entityGuids)
51+
throws AtlasBaseException {
52+
53+
if (StringUtils.isEmpty(attributeName)) {
54+
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Attribute name cannot be empty");
55+
}
56+
57+
if (StringUtils.isEmpty(repairType)) {
58+
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Repair type cannot be empty");
59+
}
60+
61+
if (CollectionUtils.isEmpty(entityGuids)) {
62+
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Entity GUIDs cannot be empty");
63+
}
64+
65+
if (entityGuids.size() > 1000) {
66+
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST,
67+
"Too many entities. Maximum allowed: 1000, provided: " + entityGuids.size());
68+
}
69+
}
70+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.atlas.repository.store.graph.v2.repair;
2+
3+
4+
import org.apache.atlas.exception.AtlasBaseException;
5+
6+
import java.util.Set;
7+
8+
public interface AtlasRepairAttributeStrategy {
9+
10+
String getRepairType();
11+
void repair(Set<String> entityGuids, String attributeName) throws AtlasBaseException;
12+
void validate (Set<String> entityGuids, String attributeName) throws AtlasBaseException;
13+
14+
}
15+
16+

0 commit comments

Comments
 (0)