Skip to content

Commit 7e06e58

Browse files
authored
Merge pull request #5380 from atlanhq/mlh-1240-custom-metadata-consistency
Mlh 1240 custom metadata consistency
2 parents e0f2a9b + 0cf1155 commit 7e06e58

File tree

12 files changed

+474
-352
lines changed

12 files changed

+474
-352
lines changed

.github/workflows/maven.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ on:
3131
- tagscanarymerge
3232
- fixlabels
3333
- interceptapis
34+
- mlh-1240-custom-metadata-consistency
3435

3536
jobs:
3637
build:

common/src/main/java/org/apache/atlas/repository/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,13 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
538538
add(STAKEHOLDER_TITLE_ENTITY_TYPE);
539539
}};
540540

541+
public static final String TYPEDEF_ENUM_CACHE_LATEST_VERSION = "typdef.enum.cache.version";
542+
public static final String TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION = "typdef.bm.cache.version";
543+
public static final String TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION = "typdef.cls.cache.version";
544+
public static final String TYPEDEF_STRUCT_CACHE_LATEST_VERSION = "typdef.struct.cache.version";
545+
public static final String TYPEDEF_ENTITY_CACHE_LATEST_VERSION = "typdef.entity.cache.version";
546+
public static final String TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION = "typdef.relationship.cache.version";
547+
541548
private Constants() {
542549
}
543550

common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.atlas.service.metrics.MetricUtils;
66
import org.apache.commons.configuration.Configuration;
77
import org.apache.commons.lang.ArrayUtils;
8+
import org.apache.commons.lang.StringUtils;
89
import org.redisson.api.RLock;
910
import org.redisson.api.RedissonClient;
1011
import org.redisson.config.Config;
@@ -191,6 +192,22 @@ public String getValue(String key) {
191192
}
192193
}
193194

195+
@Override
196+
public String getValue(String key, String defaultValue) {
197+
try {
198+
String value = getValue(key);
199+
if (StringUtils.isEmpty(value)) {
200+
return defaultValue;
201+
} else {
202+
return value;
203+
}
204+
} catch (Exception e) {
205+
MetricUtils.recordRedisConnectionFailure();
206+
getLogger().error("Redis getValue operation failed for key: {}", key, e);
207+
throw e;
208+
}
209+
}
210+
194211
@Override
195212
public String putValue(String key, String value) {
196213
try {

common/src/main/java/org/apache/atlas/service/redis/RedisService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public interface RedisService {
1616

1717
String getValue(String key);
1818

19+
String getValue(String key, String defaultValue);
20+
1921
String putValue(String key, String value);
2022

2123
String putValue(String key, String value, int timeout);

common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public String getValue(String key) {
2727
return super.getValue(key);
2828
}
2929

30+
@Override
31+
public String getValue(String key, String defaultValue) {
32+
return null;
33+
}
34+
3035
@Override
3136
public String putValue(String key, String value, int timeout) {
3237
return super.putValue(key, value, timeout);

repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java

Lines changed: 167 additions & 156 deletions
Large diffs are not rendered by default.

repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
5151
import org.apache.atlas.repository.patches.AtlasPatchManager;
5252
import org.apache.atlas.repository.patches.AtlasPatchRegistry;
53+
import org.apache.atlas.service.redis.RedisService;
5354
import org.apache.atlas.store.AtlasTypeDefStore;
5455
import org.apache.atlas.type.AtlasEntityType;
5556
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -101,15 +102,23 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
101102
private final Configuration conf;
102103
private final AtlasGraph graph;
103104
private final AtlasPatchManager patchManager;
105+
private final RedisService redisService;
106+
private static long CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION;
107+
private static long CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION;
108+
private static long CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION;
109+
private static long CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION;
110+
private static long CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION;
111+
private static long CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION;
104112

105113
@Inject
106114
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
107-
AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager) throws AtlasBaseException {
115+
AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager, RedisService redisService) throws AtlasBaseException {
108116
this.typeDefStore = typeDefStore;
109117
this.typeRegistry = typeRegistry;
110118
this.conf = conf;
111119
this.graph = graph;
112120
this.patchManager = patchManager;
121+
this.redisService = redisService;
113122
}
114123

115124
@PostConstruct
@@ -118,6 +127,12 @@ public void init() {
118127

119128
if (!HAConfiguration.isHAEnabled(conf)) {
120129
startInternal();
130+
CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1"));
131+
CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1"));
132+
CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1"));
133+
CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_STRUCT_CACHE_LATEST_VERSION, "1"));
134+
CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENTITY_CACHE_LATEST_VERSION, "1"));
135+
CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION, "1"));
121136
} else {
122137
LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation");
123138
}
@@ -407,6 +422,54 @@ public int getHandlerOrder() {
407422
return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder();
408423
}
409424

425+
public static long getCurrentEnumTypedefInternalVersion() {
426+
return CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION;
427+
}
428+
429+
public static void setCurrentEnumTypedefInternalVersion(long version) {
430+
CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = version;
431+
}
432+
433+
public static long getCurrentBMTypedefInternalVersion() {
434+
return CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION;
435+
}
436+
437+
public static void setCurrentBMTypedefInternalVersion(long version) {
438+
CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = version;
439+
}
440+
441+
public static long getCurrentClassificationTypedefInternalVersion() {
442+
return CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION;
443+
}
444+
445+
public static void setCurrentClassificationTypedefInternalVersion(long version) {
446+
CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = version;
447+
}
448+
449+
public static long getCurrentStructTypedefInternalVersion() {
450+
return CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION;
451+
}
452+
453+
public static void setCurrentStructTypedefInternalVersion(long version) {
454+
CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = version;
455+
}
456+
457+
public static long getCurrentEntityTypedefInternalVersion() {
458+
return CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION;
459+
}
460+
461+
public static void setCurrentEntityTypedefInternalVersion(long version) {
462+
CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = version;
463+
}
464+
465+
public static long getCurrentRelationshipTypedefInternalVersion() {
466+
return CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION;
467+
}
468+
469+
public static void setCurrentRelationshipTypedefInternalVersion(long version) {
470+
CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = version;
471+
}
472+
410473
private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) {
411474
boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion);
412475

0 commit comments

Comments
 (0)