From f070e24057e57465d5ec3ebb7d5848335270b2cd Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 15 Sep 2025 15:20:10 +0530 Subject: [PATCH 01/16] MLH-1240 improve refresh logic, reduce retry count and wait time --- .github/workflows/maven.yml | 1 + .../org/apache/atlas/web/rest/TypeCacheRefreshREST.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 5395dd76bb..4a50398150 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -31,6 +31,7 @@ on: - tagscanarymerge - fixlabels - interceptapis + - mlh-1240-custom-metadata-consistency jobs: build: diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java index efccf0828d..534f52192b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java @@ -72,12 +72,12 @@ private synchronized void refreshTypeDef(int expectedFieldKeys,final String trac int currentSize = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); LOG.info("Size of field keys before refresh = {} :: traceId {}", currentSize,traceId); - long totalWaitTimeInMillis = 15 * 1000;//15 seconds + long totalWaitTimeInMillis = 5 * 1000;//5 seconds long sleepTimeInMillis = 500; - long totalIterationsAllowed = Math.floorDiv(totalWaitTimeInMillis, sleepTimeInMillis); + long totalIterationsAllowed = 5; int counter = 0; - while (currentSize != expectedFieldKeys && counter++ < totalIterationsAllowed) { + while (currentSize < expectedFieldKeys && counter++ < totalIterationsAllowed) { currentSize = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); LOG.info("field keys size found = {} at iteration {} :: traceId {}", currentSize, counter, traceId); Thread.sleep(sleepTimeInMillis); From 3a6b41a22e6d4111b3044a4fe90688fee01aaf89 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 15 Sep 2025 15:57:32 +0530 Subject: [PATCH 02/16] MLH-1240 use version number to decide to invoke typedef cache refresh on-demand --- .../apache/atlas/repository/Constants.java | 2 + .../service/redis/AbstractRedisService.java | 17 ++ .../atlas/service/redis/RedisService.java | 2 + .../service/redis/RedisServiceLocalImpl.java | 5 + .../repository/graph/TypeCacheRefresher.java | 183 ++---------------- .../AtlasTypeDefStoreInitializer.java | 11 +- .../org/apache/atlas/web/rest/TypesREST.java | 106 +--------- 7 files changed, 60 insertions(+), 266 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index e8a8797669..01ab6c897a 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -538,6 +538,8 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV } add(STAKEHOLDER_TITLE_ENTITY_TYPE); }}; + public static final String TYPEDEF_CACHE_LATEST_VERSION = "typdef.cache.version"; + private Constants() { } diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index e729643d7e..3a24e078d3 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -5,6 +5,7 @@ import org.apache.atlas.service.metrics.MetricUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; @@ -191,6 +192,22 @@ public String getValue(String key) { } } + @Override + public String getValue(String key, String defaultValue) { + try { + String value = (String) redisCacheClient.getBucket(convertToNamespace(key)).get(); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } else { + return value; + } + } catch (Exception e) { + MetricUtils.recordRedisConnectionFailure(); + getLogger().error("Redis getValue operation failed for key: {}", key, e); + throw e; + } + } + @Override public String putValue(String key, String value) { try { diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java index 083793ea71..ae395c5dc3 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java @@ -16,6 +16,8 @@ public interface RedisService { String getValue(String key); + String getValue(String key, String defaultValue); + String putValue(String key, String value); String putValue(String key, String value, int timeout); diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java index c69a151a7d..734904047d 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java @@ -27,6 +27,11 @@ public String getValue(String key) { return null; } + @Override + public String getValue(String key, String defaultValue) { + return null; + } + @Override public String putValue(String key, String value, int timeout) { return null; diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index ac80b4b6ca..64bbf2e470 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -1,190 +1,37 @@ package org.apache.atlas.repository.graph; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.ha.HAConfiguration; -import org.apache.atlas.repository.RepositoryException; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.service.redis.RedisService; +import org.apache.atlas.store.AtlasTypeDefStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import javax.inject.Inject; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.*; - -import static org.apache.atlas.AtlasErrorCode.CINV_UNHEALTHY; -import static org.apache.atlas.repository.Constants.VERTEX_INDEX; @Component public class TypeCacheRefresher { private static final Logger LOG = LoggerFactory.getLogger(TypeCacheRefresher.class); - private String cacheRefresherEndpoint; - private String cacheRefresherHealthEndpoint; - private final IAtlasGraphProvider provider; - private boolean isActiveActiveHAEnabled; + private final AtlasTypeDefStore typeDefStore; @Inject - public TypeCacheRefresher(final IAtlasGraphProvider provider) { - this.provider = provider; - } - - @PostConstruct - public void init() throws AtlasException { - Configuration configuration = ApplicationProperties.get(); - this.cacheRefresherEndpoint = configuration.getString("atlas.server.type.cache-refresher"); - this.cacheRefresherHealthEndpoint = configuration.getString("atlas.server.type.cache-refresher-health"); - this.isActiveActiveHAEnabled = HAConfiguration.isActiveActiveHAEnabled(configuration); - LOG.info("Found {} as cache-refresher endpoint", cacheRefresherEndpoint); - LOG.info("Found {} as cache-refresher-health endpoint", cacheRefresherHealthEndpoint); - } - - public void verifyCacheRefresherHealth() throws AtlasBaseException, IOException { - if (StringUtils.isBlank(cacheRefresherHealthEndpoint) || !isActiveActiveHAEnabled) { - LOG.info("Skipping type-def cache refresher health checking as URL is {} and isActiveActiveHAEnabled is {}", cacheRefresherHealthEndpoint, isActiveActiveHAEnabled); - return; - } - final String healthResponseBody; - try (CloseableHttpClient client = HttpClients.createDefault()) { - final HttpGet healthRequest = new HttpGet(cacheRefresherHealthEndpoint); - healthResponseBody = executeGet(client, healthRequest); - } - LOG.debug("Response Body from cache-refresh-health = {}", healthResponseBody); - final ObjectMapper mapper = new ObjectMapper(); - final CacheRefresherHealthResponse jsonResponse = mapper.readValue(healthResponseBody, CacheRefresherHealthResponse.class); - if (!"Healthy".equalsIgnoreCase(jsonResponse.getMessage())) { - throw new AtlasBaseException(CINV_UNHEALTHY); - } - } - - public void refreshAllHostCache() throws IOException, URISyntaxException, RepositoryException { - final String traceId = RequestContext.get().getTraceId(); - if(StringUtils.isBlank(cacheRefresherEndpoint) || !isActiveActiveHAEnabled) { - LOG.info("Skipping type-def cache refresh :: traceId {}", traceId); - return; - } - - int totalFieldKeys = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); - LOG.info("Found {} totalFieldKeys to be expected in other nodes :: traceId {}", totalFieldKeys, traceId); - refreshCache(totalFieldKeys, traceId); + public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { + this.typeDefStore = typeDefStore; } - private void refreshCache(final int totalFieldKeys, final String traceId) throws IOException, URISyntaxException { - URIBuilder builder = new URIBuilder(cacheRefresherEndpoint); - builder.setParameter("expectedFieldKeys", String.valueOf(totalFieldKeys)); - builder.setParameter("traceId", traceId); - final HttpPost httpPost = new HttpPost(builder.build()); - LOG.info("Invoking cache refresh endpoint {} :: traceId {}", cacheRefresherEndpoint, traceId); - - String responseBody; - try (CloseableHttpClient client = HttpClients.createDefault()) { - responseBody = executePost(traceId, client, httpPost); - } - LOG.info("Response Body from cache-refresh = {} :: traceId {}", responseBody, traceId); - CacheRefreshResponseEnvelope cacheRefreshResponseEnvelope = convertStringToObject(responseBody); - - for (CacheRefreshResponse responseOfEachNode : cacheRefreshResponseEnvelope.getResponse()) { - if (responseOfEachNode.getStatus() != 204) { - //Do not throw exception in this case as node must have been in passive state now - LOG.error("Error while performing cache refresh on host {} . HTTP code = {} :: traceId {}", responseOfEachNode.getHost(), - responseOfEachNode.getStatus(), traceId); - } else { - LOG.info("Host {} returns response code {} :: traceId {}", responseOfEachNode.getHost(), responseOfEachNode.getStatus(), traceId); - } - } - LOG.info("Refreshed cache successfully on all hosts :: traceId {}", traceId); + public boolean isCacheRefreshNeeded(RedisService redisService) { + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); + LOG.debug("Current Redis typedef version: {}, Latest typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion()); + return AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion() != currentRedisVersion; } - private String executePost(String traceId, CloseableHttpClient client, HttpPost httpPost) throws IOException { - try (CloseableHttpResponse response = client.execute(httpPost)) { - LOG.info("Received HTTP response code {} from cache refresh endpoint :: traceId {}", response.getStatusLine().getStatusCode(), traceId); - if (response.getStatusLine().getStatusCode() != 200) { - throw new RuntimeException("Error while calling cache-refresher on host " + cacheRefresherEndpoint + ". HTTP code = " + response.getStatusLine().getStatusCode() + " :: traceId " + traceId); - } - return EntityUtils.toString(response.getEntity()); + public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { + if (isCacheRefreshNeeded(redisService)) { + LOG.info("Refreshing type-def cache as the version is different from latest"); + typeDefStore.initWithoutLock(); } } - private String executeGet(CloseableHttpClient client, HttpGet getRequest) throws IOException, AtlasBaseException { - try (CloseableHttpResponse closeableHttpResponse = client.execute(getRequest)) { - LOG.info("Received HTTP response code {} from cache refresh health endpoint", closeableHttpResponse.getStatusLine().getStatusCode()); - if (closeableHttpResponse.getStatusLine().getStatusCode() != 200) { - throw new AtlasBaseException(CINV_UNHEALTHY); - } - return EntityUtils.toString(closeableHttpResponse.getEntity()); - } - } - - private CacheRefreshResponseEnvelope convertStringToObject(final String responseBody) throws JsonProcessingException { - final ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(responseBody, CacheRefreshResponseEnvelope.class); - } -} - -class CacheRefreshResponseEnvelope { - private List response; - - public List getResponse() { - return response; - } - - public void setResponse(List response) { - this.response = response; - } -} - -class CacheRefreshResponse { - private String host; - private int status; - private Map headers; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public Map getHeaders() { - return headers; - } - - public void setHeaders(Map headers) { - this.headers = headers; - } -} - -class CacheRefresherHealthResponse { - private String message; - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 2a867452b6..a493b0adbc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -50,6 +50,7 @@ import org.apache.atlas.repository.patches.SuperTypesUpdatePatch; import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.patches.AtlasPatchRegistry; +import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; @@ -101,15 +102,18 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private final Configuration conf; private final AtlasGraph graph; private final AtlasPatchManager patchManager; + private final RedisService redisService; + private static long CURRENT_TYPEDEF_INTERNAL_VERSION; @Inject public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, - AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager) throws AtlasBaseException { + AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager, RedisService redisService) throws AtlasBaseException { this.typeDefStore = typeDefStore; this.typeRegistry = typeRegistry; this.conf = conf; this.graph = graph; this.patchManager = patchManager; + this.redisService = redisService; } @PostConstruct @@ -118,6 +122,7 @@ public void init() { if (!HAConfiguration.isHAEnabled(conf)) { startInternal(); + CURRENT_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); } else { LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation"); } @@ -407,6 +412,10 @@ public int getHandlerOrder() { return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder(); } + public static long getCurrentTypedefInternalVersion() { + return CURRENT_TYPEDEF_INTERNAL_VERSION; + } + private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index 89caf89f49..f2627cc1f5 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -60,7 +60,7 @@ import static org.apache.atlas.AtlasErrorCode.APPLICABLE_ENTITY_TYPES_DELETION_NOT_SUPPORTED; import static org.apache.atlas.AtlasErrorCode.ATTRIBUTE_DELETION_NOT_SUPPORTED; -import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery.CLIENT_ORIGIN_PRODUCT; +import static org.apache.atlas.repository.Constants.TYPEDEF_CACHE_LATEST_VERSION; import static org.apache.atlas.web.filters.AuditFilter.X_ATLAN_CLIENT_ORIGIN; /** @@ -460,7 +460,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ validateBuiltInTypeNames(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.createAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -481,6 +481,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ } finally { if (lock != null) { + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -503,11 +504,10 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ @QueryParam("allowDuplicateDisplayName") @DefaultValue("false") boolean allowDuplicateDisplayName) throws AtlasBaseException { AtlasPerfTracer perf = null; validateBuiltInTypeNames(typesDef); - validateTypeNameExists(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); Lock lock = null; try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.updateAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -548,6 +548,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ throw new AtlasBaseException("Error while updating a type definition"); } finally { if (lock != null) { + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); redisService.releaseDistributedLockV2(lock, typeDefLock); } RequestContext.clear(); @@ -571,9 +572,8 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina Lock lock = null; RequestContext.get().setTraceId(UUID.randomUUID().toString()); validateBuiltInTypeNames(typesDef); - validateTypeNameExists(typesDef); try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.deleteAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -589,6 +589,7 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -610,7 +611,7 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) Lock lock = null; RequestContext.get().setTraceId(UUID.randomUUID().toString()); try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.deleteAtlasTypeByName(" + typeName + ")"); } @@ -624,6 +625,7 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -648,45 +650,6 @@ private void validateBuiltInTypeNames(AtlasTypesDef typesDef) throws AtlasBaseEx } } - private void validateTypeNames(AtlasTypesDef typesDef) throws AtlasBaseException { - if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getEnumDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getEntityDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getStructDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getClassificationDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getBusinessMetadataDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getBusinessMetadataDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - } - - private void validateTypeNameExists(AtlasTypesDef typesDef) throws AtlasBaseException { - try { - validateTypeNames(typesDef); - } catch (AtlasBaseException e) { - if(AtlasErrorCode.TYPE_NAME_NOT_FOUND.equals(e.getAtlasErrorCode())) { - typeDefStore.initWithoutLock(); - validateTypeNames(typesDef); - } - } - } - /** * Populate a SearchFilter on the basis of the Query Parameters * @return @@ -717,53 +680,6 @@ private SearchFilter getSearchFilter(HttpServletRequest httpServletRequest) { return ret; } - private void refreshAllHostCache(String traceId, String clientOrigin){ - if (CLIENT_ORIGIN_PRODUCT.equals(clientOrigin) && AtlasConfiguration.ENABLE_ASYNC_TYPE_UPDATE.getBoolean()){ - cacheRefreshExecutor.submit(() -> { - RequestContext.get().setTraceId(traceId); - try { - int maxRetries = 5; - int retryCount = 0; - boolean success = false; - - while (!success && retryCount < maxRetries) { - try { - typeCacheRefresher.refreshAllHostCache(); - LOG.info("TypesRest.updateAtlasTypeDefs:: Typedef refreshed successfully on attempt {}", retryCount + 1); - success = true; - } catch (IOException | URISyntaxException | RepositoryException e) { - retryCount++; - if (retryCount >= maxRetries) { - LOG.error("TypesRest.updateAtlasTypeDefs:: Failed to refresh typedef after {} attempts", maxRetries, e); - break; - } - - // Exponential backoff: wait longer between each retry - long waitTimeMs = 1000 * (long)Math.pow(2, retryCount - 1); - LOG.warn("TypesRest.updateAtlasTypeDefs:: Retry attempt {} after {} ms", retryCount, waitTimeMs); - - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Retry interrupted", ie); - break; - } - } - } - } catch (Exception e) { - LOG.error("Unexpected error in async cache refresh", e); - } - }); - } else { - try { - typeCacheRefresher.refreshAllHostCache(); - } catch (IOException | URISyntaxException | RepositoryException e) { - LOG.error("Error while refreshing all host cache", e); - } - } - } - @PreDestroy public void cleanUp() { if (cacheRefreshExecutor != null) { @@ -794,7 +710,6 @@ private AtlasTypesDef createTypeDefsWithRetry(AtlasTypesDef typesDef, String cli // Perform the creation AtlasTypesDef result = typeDefStore.createTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully created typedefs on attempt {}", attempt); return result; @@ -841,7 +756,6 @@ private AtlasTypesDef updateTypeDefsWithRetry(AtlasTypesDef typesDef, String cli } AtlasTypesDef result = typeDefStore.updateTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully updated typedefs on attempt {}", attempt); return result; @@ -883,7 +797,6 @@ private void deleteTypeDefsWithRetry(AtlasTypesDef typesDef, String clientOrigin try { // Perform the deletion typeDefStore.deleteTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully deleted typedefs on attempt {}", attempt); return; @@ -922,7 +835,6 @@ private void deleteTypeByNameWithRetry(String typeName) throws AtlasBaseExceptio try { // Perform the deletion typeDefStore.deleteTypeByName(typeName); - refreshAllHostCache(RequestContext.get().getTraceId(), null); LOG.info("Successfully deleted typedef '{}' on attempt {}", typeName, attempt); return; From 41df3cd9c160806a81bc5ffca2f77991e7620b19 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 15 Sep 2025 22:52:03 +0530 Subject: [PATCH 03/16] MLH-1240 reload only business metadata --- .../store/graph/AtlasTypeDefGraphStore.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index cdc06fbc08..c010611a85 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -124,16 +124,18 @@ public void init() throws AtlasBaseException { @Override public void initWithoutLock() throws AtlasBaseException { - // need even better approach than this - AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(typeRegistry).getAll(), - getStructDefStore(typeRegistry).getAll(), - getClassificationDefStore(typeRegistry).getAll(), - getEntityDefStore(typeRegistry).getAll(), - getRelationshipDefStore(typeRegistry).getAll(), - getBusinessMetadataDefStore(typeRegistry).getAll()); - - rectifyTypeErrorsIfAny(typesDef); - typeRegistry.addTypes(typesDef); + LOG.info("==> AtlasTypeDefGraphStore.initWithoutLock()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List typesDefs = getBusinessMetadataDefStore(ttr).getAll(); + ttr.updateTypes(typesDefs); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.initWithoutLock()"); + } } @Override From 7c3e04e8d0540d93de19b4a3bf21a0f87c32205a Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 15 Sep 2025 23:00:52 +0530 Subject: [PATCH 04/16] MLH-1240 increment internal cache version --- .../org/apache/atlas/repository/graph/TypeCacheRefresher.java | 1 + .../store/bootstrap/AtlasTypeDefStoreInitializer.java | 4 ++++ webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java | 3 +-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 64bbf2e470..208c017405 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -31,6 +31,7 @@ public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseExce if (isCacheRefreshNeeded(redisService)) { LOG.info("Refreshing type-def cache as the version is different from latest"); typeDefStore.initWithoutLock(); + AtlasTypeDefStoreInitializer.incrCurrentTypedefInternalVersion(); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index a493b0adbc..4498923b98 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -416,6 +416,10 @@ public static long getCurrentTypedefInternalVersion() { return CURRENT_TYPEDEF_INTERNAL_VERSION; } + public static void incrCurrentTypedefInternalVersion() { + CURRENT_TYPEDEF_INTERNAL_VERSION++; + } + private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index f2627cc1f5..54be92dc43 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -25,7 +25,6 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.typedef.*; -import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graph.TypeCacheRefresher; import org.apache.atlas.repository.util.FilterUtil; import org.apache.atlas.service.redis.RedisService; @@ -395,7 +394,7 @@ public AtlasBusinessMetadataDef getBusinessMetadataDefByGuid(@PathParam("guid") @Timed public AtlasBusinessMetadataDef getBusinessMetadataDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBusinessMetadataDef ret = typeDefStore.getBusinessMetadataDefByName(name); return ret; From 248ea12900bef759f59a88d61472106fd6e42ad6 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Mon, 15 Sep 2025 23:36:02 +0530 Subject: [PATCH 05/16] MLH-1240 reload business and classification typedef --- .../apache/atlas/repository/graph/TypeCacheRefresher.java | 2 +- .../repository/store/graph/AtlasTypeDefGraphStore.java | 8 +++++--- .../store/graph/v2/AtlasTypeDefGraphStoreV2.java | 4 ++-- .../java/org/apache/atlas/store/AtlasTypeDefStore.java | 2 +- .../org/apache/atlas/web/rest/TypeCacheRefreshREST.java | 2 +- .../main/java/org/apache/atlas/web/rest/TypesREST.java | 2 +- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 208c017405..20607096d0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -30,7 +30,7 @@ public boolean isCacheRefreshNeeded(RedisService redisService) { public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { if (isCacheRefreshNeeded(redisService)) { LOG.info("Refreshing type-def cache as the version is different from latest"); - typeDefStore.initWithoutLock(); + typeDefStore.reloadCustomTypeDefs(); AtlasTypeDefStoreInitializer.incrCurrentTypedefInternalVersion(); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index c010611a85..438ad23b3b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -123,15 +123,17 @@ public void init() throws AtlasBaseException { } @Override - public void initWithoutLock() throws AtlasBaseException { + public void reloadCustomTypeDefs() throws AtlasBaseException { LOG.info("==> AtlasTypeDefGraphStore.initWithoutLock()"); AtlasTransientTypeRegistry ttr = null; boolean commitUpdates = false; try { ttr = typeRegistry.lockTypeRegistryForUpdate(5); - List typesDefs = getBusinessMetadataDefStore(ttr).getAll(); - ttr.updateTypes(typesDefs); + List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); + List classificationDefs = getClassificationDefStore(ttr).getAll(); + ttr.updateTypes(businessMetadataDefs); + ttr.updateTypes(classificationDefs); } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); LOG.info("<== AtlasTypeDefGraphStore.initWithoutLock()"); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java index a304567f51..6d79789b31 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java @@ -124,10 +124,10 @@ public void init() throws AtlasBaseException { @Override @GraphTransaction - public void initWithoutLock() throws AtlasBaseException { + public void reloadCustomTypeDefs() throws AtlasBaseException { LOG.info("==> AtlasTypeDefGraphStoreV1.initWithoutLock()"); - super.initWithoutLock(); + super.reloadCustomTypeDefs(); LOG.info("<== AtlasTypeDefGraphStoreV1.initWithoutLock()"); } diff --git a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java index 72ada05a78..fdca2e94e5 100644 --- a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java +++ b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java @@ -35,7 +35,7 @@ public interface AtlasTypeDefStore { void init() throws AtlasBaseException; - void initWithoutLock() throws AtlasBaseException; + void reloadCustomTypeDefs() throws AtlasBaseException; /* EnumDef operations */ diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java index 534f52192b..2740fdcd4b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java @@ -91,7 +91,7 @@ private synchronized void refreshTypeDef(int expectedFieldKeys,final String trac LOG.info("Found desired size of fieldKeys in iteration {} :: traceId {}", counter, traceId); } //Reload in-memory cache of type-registry - typeDefStore.initWithoutLock(); + typeDefStore.reloadCustomTypeDefs(); LOG.info("Size of field keys after refresh = {}", provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size()); LOG.info("Completed type-def cache refresh :: traceId {}", traceId); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index 54be92dc43..c21acccd32 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -166,7 +166,7 @@ public List getTypeDefHeaders(@Context HttpServletRequest ht @Timed public AtlasTypesDef getAllTypeDefs(@Context HttpServletRequest httpServletRequest) throws AtlasBaseException { SearchFilter searchFilter = getSearchFilter(httpServletRequest); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasTypesDef typesDef = typeDefStore.searchTypesDef(searchFilter); return typesDef; From 28e84d9477b193ca56993c56cee92954489afb7f Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 01:25:59 +0530 Subject: [PATCH 06/16] MLH-1240 use consistent version and reload if necessary in all typedef endpoints --- .../service/redis/AbstractRedisService.java | 2 +- .../repository/graph/TypeCacheRefresher.java | 3 +- .../AtlasTypeDefStoreInitializer.java | 4 +- .../org/apache/atlas/web/rest/TypesREST.java | 59 ++++++++----------- 4 files changed, 28 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index 3a24e078d3..fc31c71385 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -195,7 +195,7 @@ public String getValue(String key) { @Override public String getValue(String key, String defaultValue) { try { - String value = (String) redisCacheClient.getBucket(convertToNamespace(key)).get(); + String value = getValue(key); if (StringUtils.isEmpty(value)) { return defaultValue; } else { diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 20607096d0..9e8c851396 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -29,9 +29,10 @@ public boolean isCacheRefreshNeeded(RedisService redisService) { public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { if (isCacheRefreshNeeded(redisService)) { + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); LOG.info("Refreshing type-def cache as the version is different from latest"); typeDefStore.reloadCustomTypeDefs(); - AtlasTypeDefStoreInitializer.incrCurrentTypedefInternalVersion(); + AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(currentRedisVersion); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 4498923b98..e345155642 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -416,8 +416,8 @@ public static long getCurrentTypedefInternalVersion() { return CURRENT_TYPEDEF_INTERNAL_VERSION; } - public static void incrCurrentTypedefInternalVersion() { - CURRENT_TYPEDEF_INTERNAL_VERSION++; + public static void setCurrentTypedefInternalVersion(long version) { + CURRENT_TYPEDEF_INTERNAL_VERSION = version; } private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index c21acccd32..a3c629265a 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.typedef.*; import org.apache.atlas.repository.graph.TypeCacheRefresher; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.util.FilterUtil; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; @@ -46,8 +47,6 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -113,7 +112,7 @@ public TypesREST(AtlasTypeDefStore typeDefStore, RedisService redisService, Conf @Timed public AtlasBaseTypeDef getTypeDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBaseTypeDef ret = typeDefStore.getByName(name); return ret; @@ -131,7 +130,7 @@ public AtlasBaseTypeDef getTypeDefByName(@PathParam("name") String name) throws @Timed public AtlasBaseTypeDef getTypeDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBaseTypeDef ret = typeDefStore.getByGuid(guid); return ret; @@ -149,7 +148,7 @@ public AtlasBaseTypeDef getTypeDefByGuid(@PathParam("guid") String guid) throws @Timed public List getTypeDefHeaders(@Context HttpServletRequest httpServletRequest) throws AtlasBaseException { SearchFilter searchFilter = getSearchFilter(httpServletRequest); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasTypesDef searchTypesDef = typeDefStore.searchTypesDef(searchFilter); return AtlasTypeUtil.toTypeDefHeader(searchTypesDef); @@ -185,7 +184,6 @@ public AtlasTypesDef getAllTypeDefs(@Context HttpServletRequest httpServletReque @Timed public AtlasEnumDef getEnumDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasEnumDef ret = typeDefStore.getEnumDefByName(name); return ret; @@ -204,7 +202,6 @@ public AtlasEnumDef getEnumDefByName(@PathParam("name") String name) throws Atla @Timed public AtlasEnumDef getEnumDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasEnumDef ret = typeDefStore.getEnumDefByGuid(guid); return ret; @@ -224,7 +221,6 @@ public AtlasEnumDef getEnumDefByGuid(@PathParam("guid") String guid) throws Atla @Timed public AtlasStructDef getStructDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasStructDef ret = typeDefStore.getStructDefByName(name); return ret; @@ -243,7 +239,6 @@ public AtlasStructDef getStructDefByName(@PathParam("name") String name) throws @Timed public AtlasStructDef getStructDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasStructDef ret = typeDefStore.getStructDefByGuid(guid); return ret; @@ -262,7 +257,7 @@ public AtlasStructDef getStructDefByGuid(@PathParam("guid") String guid) throws @Timed public AtlasClassificationDef getClassificationDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasClassificationDef ret = typeDefStore.getClassificationDefByName(name); return ret; @@ -281,7 +276,7 @@ public AtlasClassificationDef getClassificationDefByName(@PathParam("name") Stri @Timed public AtlasClassificationDef getClassificationDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasClassificationDef ret = typeDefStore.getClassificationDefByGuid(guid); return ret; @@ -300,7 +295,6 @@ public AtlasClassificationDef getClassificationDefByGuid(@PathParam("guid") Stri @Timed public AtlasEntityDef getEntityDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasEntityDef ret = typeDefStore.getEntityDefByName(name); return ret; @@ -319,7 +313,6 @@ public AtlasEntityDef getEntityDefByName(@PathParam("name") String name) throws @Timed public AtlasEntityDef getEntityDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasEntityDef ret = typeDefStore.getEntityDefByGuid(guid); return ret; @@ -337,7 +330,6 @@ public AtlasEntityDef getEntityDefByGuid(@PathParam("guid") String guid) throws @Timed public AtlasRelationshipDef getRelationshipDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasRelationshipDef ret = typeDefStore.getRelationshipDefByName(name); return ret; @@ -356,7 +348,6 @@ public AtlasRelationshipDef getRelationshipDefByName(@PathParam("name") String n @Timed public AtlasRelationshipDef getRelationshipDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasRelationshipDef ret = typeDefStore.getRelationshipDefByGuid(guid); return ret; @@ -375,7 +366,7 @@ public AtlasRelationshipDef getRelationshipDefByGuid(@PathParam("guid") String g @Timed public AtlasBusinessMetadataDef getBusinessMetadataDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBusinessMetadataDef ret = typeDefStore.getBusinessMetadataDefByGuid(guid); return ret; @@ -400,22 +391,6 @@ public AtlasBusinessMetadataDef getBusinessMetadataDefByName(@PathParam("name") return ret; } - private void attemptAcquiringLock() throws AtlasBaseException { - final String traceId = RequestContext.get().getTraceId(); - try { - if (!redisService.acquireDistributedLock(typeDefLock)) { - LOG.info("Lock is already acquired. Returning now :: traceId {}", traceId); - throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK); - } - LOG.info("successfully acquired lock :: traceId {}", traceId); - } catch (AtlasBaseException e) { - throw e; - } catch (Exception e) { - LOG.error("Error while acquiring lock on type-defs :: traceId " + traceId + " ." + e.getMessage(), e); - throw new AtlasBaseException("Error while acquiring a lock on type-defs"); - } - } - private Lock attemptAcquiringLockV2() throws AtlasBaseException { final String traceId = RequestContext.get().getTraceId(); Lock lock = null; @@ -480,7 +455,10 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ } finally { if (lock != null) { - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -547,7 +525,10 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ throw new AtlasBaseException("Error while updating a type definition"); } finally { if (lock != null) { - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); redisService.releaseDistributedLockV2(lock, typeDefLock); } RequestContext.clear(); @@ -588,7 +569,10 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -624,7 +608,10 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, String.valueOf(Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "0")) + 1)); + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); From 1a6c6f0a6f2063c801bd90658b2c5b26ae22f371 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 02:34:24 +0530 Subject: [PATCH 07/16] MLH-1240 update cache when current version is less than redis version --- .../atlas/repository/graph/TypeCacheRefresher.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 9e8c851396..17d1b38f31 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -21,19 +21,19 @@ public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { this.typeDefStore = typeDefStore; } - public boolean isCacheRefreshNeeded(RedisService redisService) { - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); - LOG.debug("Current Redis typedef version: {}, Latest typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion()); - return AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion() != currentRedisVersion; - } - public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { if (isCacheRefreshNeeded(redisService)) { - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); LOG.info("Refreshing type-def cache as the version is different from latest"); typeDefStore.reloadCustomTypeDefs(); + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(currentRedisVersion); } } + private boolean isCacheRefreshNeeded(RedisService redisService) { + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); + LOG.info("Current Redis typedef version: {}, Latest typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion()); + return AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion() < currentRedisVersion; + } + } \ No newline at end of file From 887ddc65ff006830d536a3c9fce9767bfc0e45c4 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 09:40:29 +0530 Subject: [PATCH 08/16] MLH-1240 refresh custom types always (wip testing) --- .../apache/atlas/repository/graph/TypeCacheRefresher.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 17d1b38f31..56b307fee7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -22,12 +22,13 @@ public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { } public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { - if (isCacheRefreshNeeded(redisService)) { + typeDefStore.reloadCustomTypeDefs(); + /*if (isCacheRefreshNeeded(redisService)) { LOG.info("Refreshing type-def cache as the version is different from latest"); typeDefStore.reloadCustomTypeDefs(); long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(currentRedisVersion); - } + }*/ } private boolean isCacheRefreshNeeded(RedisService redisService) { From b42fdbf28b9393d815c2c1dfed48125094e61299 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 13:17:11 +0530 Subject: [PATCH 09/16] MLH-1240 refresh enum and always read from transient registry --- .../store/graph/AtlasTypeDefGraphStore.java | 61 +++++++++++-------- .../graph/v2/AtlasTypeDefGraphStoreV2.java | 8 ++- 2 files changed, 40 insertions(+), 29 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index 438ad23b3b..2486cf73c8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -132,8 +132,10 @@ public void reloadCustomTypeDefs() throws AtlasBaseException { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); List classificationDefs = getClassificationDefStore(ttr).getAll(); + List enumDefs = getEnumDefStore(ttr).getAll(); ttr.updateTypes(businessMetadataDefs); ttr.updateTypes(classificationDefs); + ttr.updateTypes(enumDefs); } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); LOG.info("<== AtlasTypeDefGraphStore.initWithoutLock()"); @@ -747,46 +749,51 @@ public void deleteTypeByName(String typeName) throws AtlasBaseException { public AtlasTypesDef searchTypesDef(SearchFilter searchFilter) throws AtlasBaseException { final AtlasTypesDef typesDef = new AtlasTypesDef(); Predicate searchPredicates = FilterUtil.getPredicateFromSearchFilter(searchFilter); - - for(AtlasEnumType enumType : typeRegistry.getAllEnumTypes()) { - if (searchPredicates.evaluate(enumType)) { - typesDef.getEnumDefs().add(enumType.getEnumDef()); + AtlasTransientTypeRegistry ttr = null; + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + for (AtlasEnumType enumType : ttr.getAllEnumTypes()) { + if (searchPredicates.evaluate(enumType)) { + typesDef.getEnumDefs().add(enumType.getEnumDef()); + } } - } - for(AtlasStructType structType : typeRegistry.getAllStructTypes()) { - if (searchPredicates.evaluate(structType)) { - typesDef.getStructDefs().add(structType.getStructDef()); + for (AtlasStructType structType : ttr.getAllStructTypes()) { + if (searchPredicates.evaluate(structType)) { + typesDef.getStructDefs().add(structType.getStructDef()); + } } - } - for(AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) { - if (searchPredicates.evaluate(classificationType)) { - typesDef.getClassificationDefs().add(classificationType.getClassificationDef()); + for (AtlasClassificationType classificationType : ttr.getAllClassificationTypes()) { + if (searchPredicates.evaluate(classificationType)) { + typesDef.getClassificationDefs().add(classificationType.getClassificationDef()); + } } - } - for(AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { - if (searchPredicates.evaluate(entityType)) { - typesDef.getEntityDefs().add(entityType.getEntityDef()); + for (AtlasEntityType entityType : ttr.getAllEntityTypes()) { + if (searchPredicates.evaluate(entityType)) { + typesDef.getEntityDefs().add(entityType.getEntityDef()); + } } - } - for(AtlasRelationshipType relationshipType : typeRegistry.getAllRelationshipTypes()) { - if (searchPredicates.evaluate(relationshipType)) { - typesDef.getRelationshipDefs().add(relationshipType.getRelationshipDef()); + for (AtlasRelationshipType relationshipType : ttr.getAllRelationshipTypes()) { + if (searchPredicates.evaluate(relationshipType)) { + typesDef.getRelationshipDefs().add(relationshipType.getRelationshipDef()); + } } - } - for(AtlasBusinessMetadataType businessMetadataType : typeRegistry.getAllBusinessMetadataTypes()) { - if (searchPredicates.evaluate(businessMetadataType)) { - typesDef.getBusinessMetadataDefs().add(businessMetadataType.getBusinessMetadataDef()); + for (AtlasBusinessMetadataType businessMetadataType : ttr.getAllBusinessMetadataTypes()) { + if (searchPredicates.evaluate(businessMetadataType)) { + typesDef.getBusinessMetadataDefs().add(businessMetadataType.getBusinessMetadataDef()); + } } - } - AtlasAuthorizationUtils.filterTypesDef(new AtlasTypesDefFilterRequest(typesDef)); + AtlasAuthorizationUtils.filterTypesDef(new AtlasTypesDefFilterRequest(typesDef)); - return typesDef; + return typesDef; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, false); + } } @Override diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java index 6d79789b31..f19aa226f0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java @@ -22,9 +22,12 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.authorize.AtlasTypesDefFilterRequest; +import org.apache.atlas.authorizer.AtlasAuthorizationUtils; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.TypeDefChangeListener; +import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.typedef.*; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.GraphHelper; @@ -34,10 +37,11 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasDefStore; import org.apache.atlas.repository.store.graph.AtlasTypeDefGraphStore; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.repository.util.FilterUtil; +import org.apache.atlas.type.*; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Predicate; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From cb8c0abd6665dc853541b2c1e23f5797b498f5df Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 17:39:46 +0530 Subject: [PATCH 10/16] MLH-1240 track version for all custom types to load them independently --- .../apache/atlas/repository/Constants.java | 4 +- .../repository/graph/TypeCacheRefresher.java | 98 ++++++++++++++++--- .../AtlasTypeDefStoreInitializer.java | 32 ++++-- .../store/graph/AtlasTypeDefGraphStore.java | 43 ++++++-- .../graph/v2/AtlasTypeDefGraphStoreV2.java | 10 -- .../apache/atlas/store/AtlasTypeDefStore.java | 8 +- .../atlas/web/rest/TypeCacheRefreshREST.java | 2 +- .../org/apache/atlas/web/rest/TypesREST.java | 36 +++---- 8 files changed, 172 insertions(+), 61 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 01ab6c897a..d4e334432e 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -538,7 +538,9 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV } add(STAKEHOLDER_TITLE_ENTITY_TYPE); }}; - public static final String TYPEDEF_CACHE_LATEST_VERSION = "typdef.cache.version"; + public static final String TYPEDEF_ENUM_CACHE_LATEST_VERSION = "typdef.enum.cache.version"; + public static final String TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION = "typdef.bm.cache.version"; + public static final String TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION = "typdef.cls.cache.version"; private Constants() { } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 56b307fee7..3596dcd6ba 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -1,16 +1,20 @@ package org.apache.atlas.repository.graph; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.*; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; +import static org.apache.atlas.repository.Constants.*; + @Component public class TypeCacheRefresher { private static final Logger LOG = LoggerFactory.getLogger(TypeCacheRefresher.class); @@ -22,19 +26,91 @@ public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { } public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { - typeDefStore.reloadCustomTypeDefs(); - /*if (isCacheRefreshNeeded(redisService)) { - LOG.info("Refreshing type-def cache as the version is different from latest"); - typeDefStore.reloadCustomTypeDefs(); - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); - AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(currentRedisVersion); - }*/ + if (isEnumTypeDefCacheRefreshNeeded(redisService)) { + LOG.info("Refreshing enum type-def cache as the version is different from latest"); + typeDefStore.reloadEnumTypeDefs(); + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); + AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(currentRedisVersion); + } + + if (isBMTypeDefCacheRefreshNeeded(redisService)) { + LOG.info("Refreshing BM type-def cache as the version is different from latest"); + typeDefStore.reloadBusinessMetadataTypeDefs(); + long currentRedisVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); + AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(currentRedisVersion); + } + + if (isClassificationTypeDefCacheRefreshNeeded(redisService)) { + LOG.info("Refreshing Classification type-def cache as the version is different from latest"); + typeDefStore.reloadClassificationMetadataTypeDefs(); + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); + AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(currentRedisVersion); + } + } + + public void updateVersion(RedisService redisService, AtlasTypesDef atlasTypesDef) { + if (atlasTypesDef == null) { + return; + } + if (CollectionUtils.isNotEmpty(atlasTypesDef.getBusinessMetadataDefs())) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(latestVersion); + } + if (CollectionUtils.isNotEmpty(atlasTypesDef.getClassificationDefs())) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(latestVersion); + } + if (CollectionUtils.isNotEmpty(atlasTypesDef.getEnumDefs())) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(latestVersion); + } } - private boolean isCacheRefreshNeeded(RedisService redisService) { - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); - LOG.info("Current Redis typedef version: {}, Latest typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion()); - return AtlasTypeDefStoreInitializer.getCurrentTypedefInternalVersion() < currentRedisVersion; + public void updateVersion(RedisService redisService, AtlasBaseTypeDef atlasBaseTypeDef) { + if (atlasBaseTypeDef == null) { + return; + } + if (atlasBaseTypeDef instanceof AtlasBusinessMetadataDef) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(latestVersion); + } + if (atlasBaseTypeDef instanceof AtlasClassificationDef) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(latestVersion); + } + if (atlasBaseTypeDef instanceof AtlasEnumDef) { + long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, latestVersionStr); + AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(latestVersion); + } } + private boolean isEnumTypeDefCacheRefreshNeeded(RedisService redisService) { + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); + LOG.info("Current Redis Enum typedef version: {}, Latest Enum typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentEnumTypedefInternalVersion()); + return AtlasTypeDefStoreInitializer.getCurrentEnumTypedefInternalVersion() < currentRedisVersion; + } + + private boolean isBMTypeDefCacheRefreshNeeded(RedisService redisService) { + long currentRedisVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); + LOG.info("Current Redis BM typedef version: {}, Latest BM typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentBMTypedefInternalVersion()); + return AtlasTypeDefStoreInitializer.getCurrentBMTypedefInternalVersion() < currentRedisVersion; + } + + private boolean isClassificationTypeDefCacheRefreshNeeded(RedisService redisService) { + long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); + LOG.info("Current Redis Classification typedef version: {}, Latest Classification typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentClassificationTypedefInternalVersion()); + return AtlasTypeDefStoreInitializer.getCurrentClassificationTypedefInternalVersion() < currentRedisVersion; + } } \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index e345155642..b54462707a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -103,7 +103,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private final AtlasGraph graph; private final AtlasPatchManager patchManager; private final RedisService redisService; - private static long CURRENT_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION; @Inject public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, @@ -122,7 +124,9 @@ public void init() { if (!HAConfiguration.isHAEnabled(conf)) { startInternal(); - CURRENT_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CACHE_LATEST_VERSION, "1")); + CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); + CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); + CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); } else { LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation"); } @@ -412,12 +416,28 @@ public int getHandlerOrder() { return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder(); } - public static long getCurrentTypedefInternalVersion() { - return CURRENT_TYPEDEF_INTERNAL_VERSION; + public static long getCurrentEnumTypedefInternalVersion() { + return CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION; } - public static void setCurrentTypedefInternalVersion(long version) { - CURRENT_TYPEDEF_INTERNAL_VERSION = version; + public static void setCurrentEnumTypedefInternalVersion(long version) { + CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentBMTypedefInternalVersion() { + return CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentBMTypedefInternalVersion(long version) { + CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentClassificationTypedefInternalVersion() { + return CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentClassificationTypedefInternalVersion(long version) { + CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = version; } private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index 2486cf73c8..c18c0d9a0c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -123,22 +123,50 @@ public void init() throws AtlasBaseException { } @Override - public void reloadCustomTypeDefs() throws AtlasBaseException { - LOG.info("==> AtlasTypeDefGraphStore.initWithoutLock()"); + public void reloadEnumTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadEnumTypeDefs()"); AtlasTransientTypeRegistry ttr = null; boolean commitUpdates = false; try { ttr = typeRegistry.lockTypeRegistryForUpdate(5); - List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); - List classificationDefs = getClassificationDefStore(ttr).getAll(); List enumDefs = getEnumDefStore(ttr).getAll(); + ttr.updateTypes(enumDefs); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadEnumTypeDefs()"); + } + } + + @Override + public void reloadBusinessMetadataTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); ttr.updateTypes(businessMetadataDefs); + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); + } + } + + @Override + public void reloadClassificationMetadataTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadClassificationMetadataTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List classificationDefs = getClassificationDefStore(ttr).getAll(); ttr.updateTypes(classificationDefs); - ttr.updateTypes(enumDefs); } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); - LOG.info("<== AtlasTypeDefGraphStore.initWithoutLock()"); + LOG.info("<== AtlasTypeDefGraphStore.reloadClassificationMetadataTypeDefs()"); } } @@ -719,7 +747,7 @@ public void deleteTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException { @Override @GraphTransaction - public void deleteTypeByName(String typeName) throws AtlasBaseException { + public AtlasBaseTypeDef deleteTypeByName(String typeName) throws AtlasBaseException { AtlasType atlasType = typeRegistry.getType(typeName); if (atlasType == null) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS.TYPE_NAME_NOT_FOUND, typeName); @@ -743,6 +771,7 @@ public void deleteTypeByName(String typeName) throws AtlasBaseException { } deleteTypesDef(typesDef); + return baseTypeDef; } @Override diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java index f19aa226f0..473127aaba 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java @@ -126,16 +126,6 @@ public void init() throws AtlasBaseException { LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); } - @Override - @GraphTransaction - public void reloadCustomTypeDefs() throws AtlasBaseException { - LOG.info("==> AtlasTypeDefGraphStoreV1.initWithoutLock()"); - - super.reloadCustomTypeDefs(); - - LOG.info("<== AtlasTypeDefGraphStoreV1.initWithoutLock()"); - } - AtlasGraph getAtlasGraph() { return atlasGraph; } @VisibleForTesting diff --git a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java index fdca2e94e5..23ff4166c8 100644 --- a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java +++ b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java @@ -35,7 +35,11 @@ public interface AtlasTypeDefStore { void init() throws AtlasBaseException; - void reloadCustomTypeDefs() throws AtlasBaseException; + void reloadEnumTypeDefs() throws AtlasBaseException; + + void reloadBusinessMetadataTypeDefs() throws AtlasBaseException; + + void reloadClassificationMetadataTypeDefs() throws AtlasBaseException; /* EnumDef operations */ @@ -112,7 +116,7 @@ AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassific AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException; - void deleteTypeByName(String typeName) throws AtlasBaseException; + AtlasBaseTypeDef deleteTypeByName(String typeName) throws AtlasBaseException; void notifyLoadCompletion(); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java index 2740fdcd4b..3654fca502 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java @@ -91,7 +91,7 @@ private synchronized void refreshTypeDef(int expectedFieldKeys,final String trac LOG.info("Found desired size of fieldKeys in iteration {} :: traceId {}", counter, traceId); } //Reload in-memory cache of type-registry - typeDefStore.reloadCustomTypeDefs(); + typeDefStore.init(); LOG.info("Size of field keys after refresh = {}", provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size()); LOG.info("Completed type-def cache refresh :: traceId {}", traceId); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index a3c629265a..1c07e0bece 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -58,7 +58,6 @@ import static org.apache.atlas.AtlasErrorCode.APPLICABLE_ENTITY_TYPES_DELETION_NOT_SUPPORTED; import static org.apache.atlas.AtlasErrorCode.ATTRIBUTE_DELETION_NOT_SUPPORTED; -import static org.apache.atlas.repository.Constants.TYPEDEF_CACHE_LATEST_VERSION; import static org.apache.atlas.web.filters.AuditFilter.X_ATLAN_CLIENT_ORIGIN; /** @@ -433,6 +432,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ AtlasPerfTracer perf = null; validateBuiltInTypeNames(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); + AtlasTypesDef atlasTypesDef = null; try { typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { @@ -444,7 +444,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ typesDef.getBusinessMetadataDefs().forEach(AtlasBusinessMetadataDef::setRandomNameForEntityAndAttributeDefs); typesDef.getClassificationDefs().forEach(AtlasClassificationDef::setRandomNameForEntityAndAttributeDefs); String clientOrigin = servletRequest.getHeader(X_ATLAN_CLIENT_ORIGIN); - AtlasTypesDef atlasTypesDef = createTypeDefsWithRetry(typesDef, clientOrigin); + atlasTypesDef = createTypeDefsWithRetry(typesDef, clientOrigin); return atlasTypesDef; } catch (AtlasBaseException atlasBaseException) { LOG.error("TypesREST.createAtlasTypeDefs:: " + atlasBaseException.getMessage(), atlasBaseException); @@ -455,10 +455,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ } finally { if (lock != null) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); + typeCacheRefresher.updateVersion(redisService, atlasTypesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -483,6 +480,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ validateBuiltInTypeNames(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); Lock lock = null; + AtlasTypesDef atlasTypesDef = null; try { typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { @@ -514,7 +512,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ RequestContext.get().setAllowDuplicateDisplayName(allowDuplicateDisplayName); LOG.info("TypesRest.updateAtlasTypeDefs:: Typedef patch enabled:" + patch); String clientOrigin = servletRequest.getHeader(X_ATLAN_CLIENT_ORIGIN); - AtlasTypesDef atlasTypesDef = updateTypeDefsWithRetry(typesDef, clientOrigin); + atlasTypesDef = updateTypeDefsWithRetry(typesDef, clientOrigin); LOG.info("TypesRest.updateAtlasTypeDefs:: Done"); return atlasTypesDef; } catch (AtlasBaseException atlasBaseException) { @@ -525,10 +523,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ throw new AtlasBaseException("Error while updating a type definition"); } finally { if (lock != null) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); + typeCacheRefresher.updateVersion(redisService, atlasTypesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } RequestContext.clear(); @@ -569,10 +564,7 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); + typeCacheRefresher.updateVersion(redisService, typesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -593,13 +585,14 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) AtlasPerfTracer perf = null; Lock lock = null; RequestContext.get().setTraceId(UUID.randomUUID().toString()); + AtlasBaseTypeDef atlasBaseTypeDef = null; try { typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.deleteAtlasTypeByName(" + typeName + ")"); } lock = attemptAcquiringLockV2(); - deleteTypeByNameWithRetry(typeName); + atlasBaseTypeDef = deleteTypeByNameWithRetry(typeName); } catch (AtlasBaseException atlasBaseException) { LOG.error("TypesREST.deleteAtlasTypeByName:: " + atlasBaseException.getMessage(), atlasBaseException); throw atlasBaseException; @@ -608,10 +601,7 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentTypedefInternalVersion(latestVersion); + typeCacheRefresher.updateVersion(redisService, atlasBaseTypeDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -814,15 +804,15 @@ private void deleteTypeDefsWithRetry(AtlasTypesDef typesDef, String clientOrigin throw lastException; // Should never reach here, but for completeness } - private void deleteTypeByNameWithRetry(String typeName) throws AtlasBaseException { + private AtlasBaseTypeDef deleteTypeByNameWithRetry(String typeName) throws AtlasBaseException { AtlasBaseException lastException = null; for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { try { // Perform the deletion - typeDefStore.deleteTypeByName(typeName); + AtlasBaseTypeDef atlasBaseTypeDef = typeDefStore.deleteTypeByName(typeName); LOG.info("Successfully deleted typedef '{}' on attempt {}", typeName, attempt); - return; + return atlasBaseTypeDef; } catch (AtlasBaseException e) { lastException = e; From b91db523c793b47dec6d5e291a1b4e269be1085e Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Tue, 16 Sep 2025 20:49:59 +0530 Subject: [PATCH 11/16] MLH-1240 commit update --- .../atlas/repository/store/graph/AtlasTypeDefGraphStore.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index c18c0d9a0c..bbefc55d01 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -132,6 +132,7 @@ public void reloadEnumTypeDefs() throws AtlasBaseException { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List enumDefs = getEnumDefStore(ttr).getAll(); ttr.updateTypes(enumDefs); + commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); LOG.info("<== AtlasTypeDefGraphStore.reloadEnumTypeDefs()"); @@ -148,6 +149,7 @@ public void reloadBusinessMetadataTypeDefs() throws AtlasBaseException { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); ttr.updateTypes(businessMetadataDefs); + commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); LOG.info("<== AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); @@ -164,6 +166,7 @@ public void reloadClassificationMetadataTypeDefs() throws AtlasBaseException { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List classificationDefs = getClassificationDefStore(ttr).getAll(); ttr.updateTypes(classificationDefs); + commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); LOG.info("<== AtlasTypeDefGraphStore.reloadClassificationMetadataTypeDefs()"); From ce513e4dcf87ac648a379f17f20a748ad989957a Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Wed, 17 Sep 2025 18:29:42 +0530 Subject: [PATCH 12/16] MLH-1240 clear that type of atlas typedef and reload to handle deletes correctly --- .../store/graph/AtlasTypeDefGraphStore.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index bbefc55d01..df1a8e3718 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -131,7 +131,12 @@ public void reloadEnumTypeDefs() throws AtlasBaseException { try { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List enumDefs = getEnumDefStore(ttr).getAll(); - ttr.updateTypes(enumDefs); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllEnumDefs()) { + if (atlasBaseTypeDef instanceof AtlasEnumDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(enumDefs); commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); @@ -148,7 +153,12 @@ public void reloadBusinessMetadataTypeDefs() throws AtlasBaseException { try { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); - ttr.updateTypes(businessMetadataDefs); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllBusinessMetadataDefs()) { + if (atlasBaseTypeDef instanceof AtlasBusinessMetadataDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(businessMetadataDefs); commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); @@ -165,7 +175,12 @@ public void reloadClassificationMetadataTypeDefs() throws AtlasBaseException { try { ttr = typeRegistry.lockTypeRegistryForUpdate(5); List classificationDefs = getClassificationDefStore(ttr).getAll(); - ttr.updateTypes(classificationDefs); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllClassificationDefs()) { + if (atlasBaseTypeDef instanceof AtlasClassificationDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(classificationDefs); commitUpdates = true; } finally { typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); From 0cf115536c40d22534bb3673c5b9d55598e4baaa Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Thu, 18 Sep 2025 00:11:17 +0530 Subject: [PATCH 13/16] MLH-1240 add struct, enum and relationship def and refactor cache refresher --- .../apache/atlas/repository/Constants.java | 3 + .../repository/graph/TypeCacheRefresher.java | 225 ++++++++++++------ .../AtlasTypeDefStoreInitializer.java | 30 +++ .../store/graph/AtlasTypeDefGraphStore.java | 66 +++++ .../apache/atlas/store/AtlasTypeDefStore.java | 6 + 5 files changed, 260 insertions(+), 70 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index d4e334432e..177e7783f7 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -541,6 +541,9 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV } public static final String TYPEDEF_ENUM_CACHE_LATEST_VERSION = "typdef.enum.cache.version"; public static final String TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION = "typdef.bm.cache.version"; public static final String TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION = "typdef.cls.cache.version"; + public static final String TYPEDEF_STRUCT_CACHE_LATEST_VERSION = "typdef.struct.cache.version"; + public static final String TYPEDEF_ENTITY_CACHE_LATEST_VERSION = "typdef.entity.cache.version"; + public static final String TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION = "typdef.relationship.cache.version"; private Constants() { } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index 3596dcd6ba..053538541b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -2,7 +2,6 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.typedef.*; -import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; @@ -12,6 +11,11 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; import static org.apache.atlas.repository.Constants.*; @@ -20,55 +24,163 @@ public class TypeCacheRefresher { private static final Logger LOG = LoggerFactory.getLogger(TypeCacheRefresher.class); private final AtlasTypeDefStore typeDefStore; + // Define type metadata to make code generic + private static class TypeDefMetadata { + final String redisKey; + final String typeName; + final Supplier getCurrentVersion; + final Consumer setCurrentVersion; + final Runnable reloadTypeDefs; + + TypeDefMetadata(String redisKey, String typeName, + Supplier getCurrentVersion, + Consumer setCurrentVersion, + Runnable reloadTypeDefs) { + this.redisKey = redisKey; + this.typeName = typeName; + this.getCurrentVersion = getCurrentVersion; + this.setCurrentVersion = setCurrentVersion; + this.reloadTypeDefs = reloadTypeDefs; + } + } + + // Map of all type definitions and their metadata + private final Map, TypeDefMetadata> typeDefMetadataMap; + @Inject public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { this.typeDefStore = typeDefStore; + this.typeDefMetadataMap = new HashMap<>(); + + // Initialize metadata for each type + typeDefMetadataMap.put(AtlasBusinessMetadataDef.class, new TypeDefMetadata( + TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, + "BM", + AtlasTypeDefStoreInitializer::getCurrentBMTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentBMTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadBusinessMetadataTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading BM typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasClassificationDef.class, new TypeDefMetadata( + TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, + "Classification", + AtlasTypeDefStoreInitializer::getCurrentClassificationTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentClassificationTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadClassificationMetadataTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Classification typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasEnumDef.class, new TypeDefMetadata( + TYPEDEF_ENUM_CACHE_LATEST_VERSION, + "Enum", + AtlasTypeDefStoreInitializer::getCurrentEnumTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentEnumTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadEnumTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Enum typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasStructDef.class, new TypeDefMetadata( + TYPEDEF_STRUCT_CACHE_LATEST_VERSION, + "Struct", + AtlasTypeDefStoreInitializer::getCurrentStructTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentStructTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadStructTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Struct typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasEntityDef.class, new TypeDefMetadata( + TYPEDEF_ENTITY_CACHE_LATEST_VERSION, + "Entity", + AtlasTypeDefStoreInitializer::getCurrentEntityTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentEntityTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadEntityTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Entity typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasRelationshipDef.class, new TypeDefMetadata( + TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION, + "Relationship", + AtlasTypeDefStoreInitializer::getCurrentRelationshipTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentRelationshipTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadRelationshipTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Relationship typedefs", e); + } + } + )); } public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { - if (isEnumTypeDefCacheRefreshNeeded(redisService)) { - LOG.info("Refreshing enum type-def cache as the version is different from latest"); - typeDefStore.reloadEnumTypeDefs(); - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); - AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(currentRedisVersion); - } - - if (isBMTypeDefCacheRefreshNeeded(redisService)) { - LOG.info("Refreshing BM type-def cache as the version is different from latest"); - typeDefStore.reloadBusinessMetadataTypeDefs(); - long currentRedisVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); - AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(currentRedisVersion); + for (TypeDefMetadata metadata : typeDefMetadataMap.values()) { + if (isTypeDefCacheRefreshNeeded(redisService, metadata)) { + LOG.info("Refreshing {} type-def cache as the version is different from latest", metadata.typeName); + metadata.reloadTypeDefs.run(); + long currentRedisVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")); + metadata.setCurrentVersion.accept(currentRedisVersion); + } } + } - if (isClassificationTypeDefCacheRefreshNeeded(redisService)) { - LOG.info("Refreshing Classification type-def cache as the version is different from latest"); - typeDefStore.reloadClassificationMetadataTypeDefs(); - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); - AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(currentRedisVersion); - } + private boolean isTypeDefCacheRefreshNeeded(RedisService redisService, TypeDefMetadata metadata) { + long currentRedisVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")); + long currentInternalVersion = metadata.getCurrentVersion.get(); + LOG.info("Current Redis {} typedef version: {}, Latest {} typedef version: {}", + metadata.typeName, currentRedisVersion, metadata.typeName, currentInternalVersion); + return currentInternalVersion < currentRedisVersion; } public void updateVersion(RedisService redisService, AtlasTypesDef atlasTypesDef) { if (atlasTypesDef == null) { return; } - if (CollectionUtils.isNotEmpty(atlasTypesDef.getBusinessMetadataDefs())) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(latestVersion); - } - if (CollectionUtils.isNotEmpty(atlasTypesDef.getClassificationDefs())) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(latestVersion); - } - if (CollectionUtils.isNotEmpty(atlasTypesDef.getEnumDefs())) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(latestVersion); + + updateTypeDefVersions(redisService, atlasTypesDef.getBusinessMetadataDefs(), AtlasBusinessMetadataDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getClassificationDefs(), AtlasClassificationDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getEnumDefs(), AtlasEnumDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getStructDefs(), AtlasStructDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getEntityDefs(), AtlasEntityDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getRelationshipDefs(), AtlasRelationshipDef.class); + } + + private void updateTypeDefVersions(RedisService redisService, + List typeDefs, + Class typeClass) { + if (CollectionUtils.isNotEmpty(typeDefs)) { + TypeDefMetadata metadata = typeDefMetadataMap.get(typeClass); + if (metadata != null) { + long latestVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(metadata.redisKey, latestVersionStr); + metadata.setCurrentVersion.accept(latestVersion); + } } } @@ -76,41 +188,14 @@ public void updateVersion(RedisService redisService, AtlasBaseTypeDef atlasBaseT if (atlasBaseTypeDef == null) { return; } - if (atlasBaseTypeDef instanceof AtlasBusinessMetadataDef) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentBMTypedefInternalVersion(latestVersion); - } - if (atlasBaseTypeDef instanceof AtlasClassificationDef) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")) + 1; - String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentClassificationTypedefInternalVersion(latestVersion); - } - if (atlasBaseTypeDef instanceof AtlasEnumDef) { - long latestVersion = Long.parseLong(redisService.getValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")) + 1; + + TypeDefMetadata metadata = typeDefMetadataMap.get(atlasBaseTypeDef.getClass()); + if (metadata != null) { + long latestVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")) + 1; String latestVersionStr = String.valueOf(latestVersion); - redisService.putValue(TYPEDEF_ENUM_CACHE_LATEST_VERSION, latestVersionStr); - AtlasTypeDefStoreInitializer.setCurrentEnumTypedefInternalVersion(latestVersion); + redisService.putValue(metadata.redisKey, latestVersionStr); + metadata.setCurrentVersion.accept(latestVersion); } } - private boolean isEnumTypeDefCacheRefreshNeeded(RedisService redisService) { - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); - LOG.info("Current Redis Enum typedef version: {}, Latest Enum typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentEnumTypedefInternalVersion()); - return AtlasTypeDefStoreInitializer.getCurrentEnumTypedefInternalVersion() < currentRedisVersion; - } - - private boolean isBMTypeDefCacheRefreshNeeded(RedisService redisService) { - long currentRedisVersion = Long.parseLong(redisService.getValue(TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); - LOG.info("Current Redis BM typedef version: {}, Latest BM typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentBMTypedefInternalVersion()); - return AtlasTypeDefStoreInitializer.getCurrentBMTypedefInternalVersion() < currentRedisVersion; - } - - private boolean isClassificationTypeDefCacheRefreshNeeded(RedisService redisService) { - long currentRedisVersion = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); - LOG.info("Current Redis Classification typedef version: {}, Latest Classification typedef version: {}", currentRedisVersion, AtlasTypeDefStoreInitializer.getCurrentClassificationTypedefInternalVersion()); - return AtlasTypeDefStoreInitializer.getCurrentClassificationTypedefInternalVersion() < currentRedisVersion; - } -} \ No newline at end of file +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index b54462707a..5ea59b1206 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -106,6 +106,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private static long CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION; private static long CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION; private static long CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION; @Inject public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, @@ -127,6 +130,9 @@ public void init() { CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); + CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_STRUCT_CACHE_LATEST_VERSION, "1")); + CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENTITY_CACHE_LATEST_VERSION, "1")); + CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION, "1")); } else { LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation"); } @@ -440,6 +446,30 @@ public static void setCurrentClassificationTypedefInternalVersion(long version) CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = version; } + public static long getCurrentStructTypedefInternalVersion() { + return CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentStructTypedefInternalVersion(long version) { + CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentEntityTypedefInternalVersion() { + return CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentEntityTypedefInternalVersion(long version) { + CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentRelationshipTypedefInternalVersion() { + return CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentRelationshipTypedefInternalVersion(long version) { + CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = version; + } + private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index df1a8e3718..943336a47d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -144,6 +144,72 @@ public void reloadEnumTypeDefs() throws AtlasBaseException { } } + @Override + public void reloadStructTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadStructTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List structDefs = getStructDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllStructDefs()) { + if (atlasBaseTypeDef instanceof AtlasStructDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(structDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadStructTypeDefs()"); + } + } + + @Override + public void reloadEntityTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadEntityTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List entityDefs = getEntityDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllEntityDefs()) { + if (atlasBaseTypeDef instanceof AtlasEntityDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(entityDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadEntityTypeDefs()"); + } + } + + @Override + public void reloadRelationshipTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadRelationshipTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List relationshipDefs = getRelationshipDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllRelationshipDefs()) { + if (atlasBaseTypeDef instanceof AtlasRelationshipDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(relationshipDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadRelationshipTypeDefs()"); + } + } + @Override public void reloadBusinessMetadataTypeDefs() throws AtlasBaseException { LOG.info("==> AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); diff --git a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java index 23ff4166c8..990a637d2f 100644 --- a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java +++ b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java @@ -41,6 +41,12 @@ public interface AtlasTypeDefStore { void reloadClassificationMetadataTypeDefs() throws AtlasBaseException; + void reloadStructTypeDefs() throws AtlasBaseException; + + void reloadEntityTypeDefs() throws AtlasBaseException; + + void reloadRelationshipTypeDefs() throws AtlasBaseException; + /* EnumDef operations */ AtlasEnumDef getEnumDefByName(String name) throws AtlasBaseException; From 34f72fe831023fbdeaf5e55a0e369e198a541f46 Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Fri, 19 Sep 2025 01:50:22 +0530 Subject: [PATCH 14/16] remove other redis implementations --- .github/workflows/maven.yml | 2 +- .../atlas/service/FeatureFlagStore.java | 5 -- .../service/redis/NoRedisServiceImpl.java | 52 ------------------- .../atlas/service/redis/RedisServiceImpl.java | 4 +- .../service/redis/RedisServiceLocalImpl.java | 49 ----------------- 5 files changed, 2 insertions(+), 110 deletions(-) delete mode 100644 common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java delete mode 100644 common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 4a50398150..4592348b21 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -31,7 +31,7 @@ on: - tagscanarymerge - fixlabels - interceptapis - - mlh-1240-custom-metadata-consistency + - fix-redis-wiring jobs: build: diff --git a/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java b/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java index 5b28b00653..599b570473 100644 --- a/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java +++ b/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java @@ -1,6 +1,5 @@ package org.apache.atlas.service; -import org.apache.atlas.service.redis.NoRedisServiceImpl; import org.apache.atlas.service.redis.RedisService; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeansException; @@ -106,10 +105,6 @@ public void initialize() throws InterruptedException { private void validateDependencies() { LOG.info("Validating FeatureFlagStore dependencies..."); try { - if (redisService instanceof NoRedisServiceImpl) { - return; - } - // Test Redis connectivity with a simple operation String testKey = "ff:_health_check"; redisService.putValue(testKey, "test"); diff --git a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java deleted file mode 100644 index f4e1ee035d..0000000000 --- a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.atlas.service.redis; - -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -@Component("redisServiceImpl") -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl", isDefault = true) -public class NoRedisServiceImpl extends AbstractRedisService { - - private static final Logger LOG = LoggerFactory.getLogger(NoRedisServiceImpl.class); - - @PostConstruct - public void init() { - LOG.info("Enabled no redis implementation."); - } - - @Override - public boolean acquireDistributedLock(String key) { - //do nothing - return true; - } - - @Override - public void releaseDistributedLock(String key) { - //do nothing - } - - @Override - public String getValue(String key) { - return null; - } - - @Override - public String putValue(String key, String value, int timeout) { - return null; - } - - @Override - public void removeValue(String key) { - - } - - @Override - public Logger getLogger() { - return LOG; - } - -} diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java index b8f2ceef29..67b33c9f56 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java @@ -1,7 +1,6 @@ package org.apache.atlas.service.redis; import org.apache.atlas.AtlasException; -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.service.metrics.MetricUtils; import org.redisson.Redisson; import org.slf4j.Logger; @@ -13,9 +12,8 @@ import javax.annotation.PostConstruct; @Component -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl") @Order(Ordered.HIGHEST_PRECEDENCE) -public class RedisServiceImpl extends AbstractRedisService{ +public class RedisServiceImpl extends AbstractRedisService { private static final Logger LOG = LoggerFactory.getLogger(RedisServiceImpl.class); private static final long RETRY_DELAY_MS = 1000L; diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java deleted file mode 100644 index 0f84ff3ac5..0000000000 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.apache.atlas.service.redis; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; -import org.redisson.Redisson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -@Component("redisServiceImpl") -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl") -public class RedisServiceLocalImpl extends AbstractRedisService { - - private static final Logger LOG = LoggerFactory.getLogger(RedisServiceLocalImpl.class); - - @PostConstruct - public void init() throws AtlasException { - redisClient = Redisson.create(getLocalConfig()); - redisCacheClient = Redisson.create(getLocalConfig()); - LOG.info("Local redis client created successfully."); - } - - @Override - public String getValue(String key) { - return super.getValue(key); - } - - @Override - public String getValue(String key, String defaultValue) { - return null; - } - - @Override - public String putValue(String key, String value, int timeout) { - return super.putValue(key, value, timeout); - } - - @Override - public void removeValue(String key) { - super.removeValue(key); - } - - @Override - public Logger getLogger() { - return LOG; - } -} From 16da29d9356cdbba9db6622caa375b439196aa5a Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Fri, 19 Sep 2025 17:21:02 +0530 Subject: [PATCH 15/16] MLH-1277 fix for delete tags correctly --- .../store/graph/v1/SoftDeleteHandlerV1.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java index 3d8ae59ed0..97a05a4aa8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java @@ -39,9 +39,7 @@ import java.util.Collection; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; -import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; public class SoftDeleteHandlerV1 extends DeleteHandlerV1 { @@ -77,8 +75,15 @@ protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseExcepti LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force); } - if (edge == null || getTypeName(edge) == null) { - LOG.warn("Edge is null or its typeName is empty. Nothing to delete"); + if (edge == null) { + LOG.warn("Edge is null. Nothing to delete"); + return; + } + + //tag vertex do not have typeName, but they have a label + if (!CLASSIFICATION_LABEL.equalsIgnoreCase(edge.getLabel()) + && getTypeName(edge) == null) { + LOG.warn("Edge is not a tag type and typeName is empty. Nothing to delete"); return; } From bf37bc7b37a83d8dab88c101aca548044d4fb38d Mon Sep 17 00:00:00 2001 From: sriram-atlan Date: Fri, 19 Sep 2025 17:21:27 +0530 Subject: [PATCH 16/16] MLH-1277 branch to build --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 4592348b21..e37e48aa84 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -31,7 +31,7 @@ on: - tagscanarymerge - fixlabels - interceptapis - - fix-redis-wiring + - 1277-master jobs: build: