diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index def7e407cfd..0c623135d4a 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -536,6 +536,13 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV } add(STAKEHOLDER_TITLE_ENTITY_TYPE); }}; + public static final String TYPEDEF_ENUM_CACHE_LATEST_VERSION = "typdef.enum.cache.version"; + public static final String TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION = "typdef.bm.cache.version"; + public static final String TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION = "typdef.cls.cache.version"; + public static final String TYPEDEF_STRUCT_CACHE_LATEST_VERSION = "typdef.struct.cache.version"; + public static final String TYPEDEF_ENTITY_CACHE_LATEST_VERSION = "typdef.entity.cache.version"; + public static final String TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION = "typdef.relationship.cache.version"; + private Constants() { } diff --git a/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java b/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java index 196ea9e228f..569f79c4275 100644 --- a/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java +++ b/common/src/main/java/org/apache/atlas/service/FeatureFlagStore.java @@ -1,6 +1,5 @@ package org.apache.atlas.service; -import org.apache.atlas.service.redis.NoRedisServiceImpl; import org.apache.atlas.service.redis.RedisService; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeansException; @@ -106,10 +105,6 @@ public void initialize() throws InterruptedException { private void validateDependencies() { LOG.info("Validating FeatureFlagStore dependencies..."); try { - if (redisService instanceof NoRedisServiceImpl) { - return; - } - // Test Redis connectivity with a simple operation String testKey = "ff:_health_check"; redisService.putValue(testKey, "test"); diff --git a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java index e729643d7e7..fc31c713857 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/AbstractRedisService.java @@ -5,6 +5,7 @@ import org.apache.atlas.service.metrics.MetricUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; @@ -191,6 +192,22 @@ public String getValue(String key) { } } + @Override + public String getValue(String key, String defaultValue) { + try { + String value = getValue(key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } else { + return value; + } + } catch (Exception e) { + MetricUtils.recordRedisConnectionFailure(); + getLogger().error("Redis getValue operation failed for key: {}", key, e); + throw e; + } + } + @Override public String putValue(String key, String value) { try { diff --git a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java deleted file mode 100644 index f4e1ee035de..00000000000 --- a/common/src/main/java/org/apache/atlas/service/redis/NoRedisServiceImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.atlas.service.redis; - -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -@Component("redisServiceImpl") -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl", isDefault = true) -public class NoRedisServiceImpl extends AbstractRedisService { - - private static final Logger LOG = LoggerFactory.getLogger(NoRedisServiceImpl.class); - - @PostConstruct - public void init() { - LOG.info("Enabled no redis implementation."); - } - - @Override - public boolean acquireDistributedLock(String key) { - //do nothing - return true; - } - - @Override - public void releaseDistributedLock(String key) { - //do nothing - } - - @Override - public String getValue(String key) { - return null; - } - - @Override - public String putValue(String key, String value, int timeout) { - return null; - } - - @Override - public void removeValue(String key) { - - } - - @Override - public Logger getLogger() { - return LOG; - } - -} diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java index 083793ea71e..ae395c5dc33 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisService.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisService.java @@ -16,6 +16,8 @@ public interface RedisService { String getValue(String key); + String getValue(String key, String defaultValue); + String putValue(String key, String value); String putValue(String key, String value, int timeout); diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java index b8f2ceef294..67b33c9f565 100644 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java +++ b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceImpl.java @@ -1,7 +1,6 @@ package org.apache.atlas.service.redis; import org.apache.atlas.AtlasException; -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.service.metrics.MetricUtils; import org.redisson.Redisson; import org.slf4j.Logger; @@ -13,9 +12,8 @@ import javax.annotation.PostConstruct; @Component -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl") @Order(Ordered.HIGHEST_PRECEDENCE) -public class RedisServiceImpl extends AbstractRedisService{ +public class RedisServiceImpl extends AbstractRedisService { private static final Logger LOG = LoggerFactory.getLogger(RedisServiceImpl.class); private static final long RETRY_DELAY_MS = 1000L; diff --git a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java b/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java deleted file mode 100644 index e4e8b72a7e3..00000000000 --- a/common/src/main/java/org/apache/atlas/service/redis/RedisServiceLocalImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.atlas.service.redis; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.annotation.ConditionalOnAtlasProperty; -import org.redisson.Redisson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; - -@Component("redisServiceImpl") -@ConditionalOnAtlasProperty(property = "atlas.redis.service.impl") -public class RedisServiceLocalImpl extends AbstractRedisService { - - private static final Logger LOG = LoggerFactory.getLogger(RedisServiceLocalImpl.class); - - @PostConstruct - public void init() throws AtlasException { - redisClient = Redisson.create(getLocalConfig()); - redisCacheClient = Redisson.create(getLocalConfig()); - LOG.info("Local redis client created successfully."); - } - - @Override - public String getValue(String key) { - return super.getValue(key); - } - - @Override - public String putValue(String key, String value, int timeout) { - return super.putValue(key, value, timeout); - } - - @Override - public void removeValue(String key) { - super.removeValue(key); - } - - @Override - public Logger getLogger() { - return LOG; - } -} diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java index ac80b4b6ca7..053538541be 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypeCacheRefresher.java @@ -1,190 +1,201 @@ package org.apache.atlas.repository.graph; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.ha.HAConfiguration; -import org.apache.atlas.repository.RepositoryException; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; +import org.apache.atlas.model.typedef.*; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.service.redis.RedisService; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import javax.inject.Inject; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; -import static org.apache.atlas.AtlasErrorCode.CINV_UNHEALTHY; -import static org.apache.atlas.repository.Constants.VERTEX_INDEX; +import static org.apache.atlas.repository.Constants.*; @Component public class TypeCacheRefresher { private static final Logger LOG = LoggerFactory.getLogger(TypeCacheRefresher.class); - private String cacheRefresherEndpoint; - private String cacheRefresherHealthEndpoint; - private final IAtlasGraphProvider provider; - private boolean isActiveActiveHAEnabled; - - @Inject - public TypeCacheRefresher(final IAtlasGraphProvider provider) { - this.provider = provider; - } - - @PostConstruct - public void init() throws AtlasException { - Configuration configuration = ApplicationProperties.get(); - this.cacheRefresherEndpoint = configuration.getString("atlas.server.type.cache-refresher"); - this.cacheRefresherHealthEndpoint = configuration.getString("atlas.server.type.cache-refresher-health"); - this.isActiveActiveHAEnabled = HAConfiguration.isActiveActiveHAEnabled(configuration); - LOG.info("Found {} as cache-refresher endpoint", cacheRefresherEndpoint); - LOG.info("Found {} as cache-refresher-health endpoint", cacheRefresherHealthEndpoint); - } - - public void verifyCacheRefresherHealth() throws AtlasBaseException, IOException { - if (StringUtils.isBlank(cacheRefresherHealthEndpoint) || !isActiveActiveHAEnabled) { - LOG.info("Skipping type-def cache refresher health checking as URL is {} and isActiveActiveHAEnabled is {}", cacheRefresherHealthEndpoint, isActiveActiveHAEnabled); - return; - } - final String healthResponseBody; - try (CloseableHttpClient client = HttpClients.createDefault()) { - final HttpGet healthRequest = new HttpGet(cacheRefresherHealthEndpoint); - healthResponseBody = executeGet(client, healthRequest); - } - LOG.debug("Response Body from cache-refresh-health = {}", healthResponseBody); - final ObjectMapper mapper = new ObjectMapper(); - final CacheRefresherHealthResponse jsonResponse = mapper.readValue(healthResponseBody, CacheRefresherHealthResponse.class); - if (!"Healthy".equalsIgnoreCase(jsonResponse.getMessage())) { - throw new AtlasBaseException(CINV_UNHEALTHY); + private final AtlasTypeDefStore typeDefStore; + + // Define type metadata to make code generic + private static class TypeDefMetadata { + final String redisKey; + final String typeName; + final Supplier getCurrentVersion; + final Consumer setCurrentVersion; + final Runnable reloadTypeDefs; + + TypeDefMetadata(String redisKey, String typeName, + Supplier getCurrentVersion, + Consumer setCurrentVersion, + Runnable reloadTypeDefs) { + this.redisKey = redisKey; + this.typeName = typeName; + this.getCurrentVersion = getCurrentVersion; + this.setCurrentVersion = setCurrentVersion; + this.reloadTypeDefs = reloadTypeDefs; } } - public void refreshAllHostCache() throws IOException, URISyntaxException, RepositoryException { - final String traceId = RequestContext.get().getTraceId(); - if(StringUtils.isBlank(cacheRefresherEndpoint) || !isActiveActiveHAEnabled) { - LOG.info("Skipping type-def cache refresh :: traceId {}", traceId); - return; - } - - int totalFieldKeys = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); - LOG.info("Found {} totalFieldKeys to be expected in other nodes :: traceId {}", totalFieldKeys, traceId); - refreshCache(totalFieldKeys, traceId); - } + // Map of all type definitions and their metadata + private final Map, TypeDefMetadata> typeDefMetadataMap; - private void refreshCache(final int totalFieldKeys, final String traceId) throws IOException, URISyntaxException { - URIBuilder builder = new URIBuilder(cacheRefresherEndpoint); - builder.setParameter("expectedFieldKeys", String.valueOf(totalFieldKeys)); - builder.setParameter("traceId", traceId); - final HttpPost httpPost = new HttpPost(builder.build()); - LOG.info("Invoking cache refresh endpoint {} :: traceId {}", cacheRefresherEndpoint, traceId); - - String responseBody; - try (CloseableHttpClient client = HttpClients.createDefault()) { - responseBody = executePost(traceId, client, httpPost); - } - LOG.info("Response Body from cache-refresh = {} :: traceId {}", responseBody, traceId); - CacheRefreshResponseEnvelope cacheRefreshResponseEnvelope = convertStringToObject(responseBody); - - for (CacheRefreshResponse responseOfEachNode : cacheRefreshResponseEnvelope.getResponse()) { - if (responseOfEachNode.getStatus() != 204) { - //Do not throw exception in this case as node must have been in passive state now - LOG.error("Error while performing cache refresh on host {} . HTTP code = {} :: traceId {}", responseOfEachNode.getHost(), - responseOfEachNode.getStatus(), traceId); - } else { - LOG.info("Host {} returns response code {} :: traceId {}", responseOfEachNode.getHost(), responseOfEachNode.getStatus(), traceId); + @Inject + public TypeCacheRefresher(final AtlasTypeDefStore typeDefStore) { + this.typeDefStore = typeDefStore; + this.typeDefMetadataMap = new HashMap<>(); + + // Initialize metadata for each type + typeDefMetadataMap.put(AtlasBusinessMetadataDef.class, new TypeDefMetadata( + TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, + "BM", + AtlasTypeDefStoreInitializer::getCurrentBMTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentBMTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadBusinessMetadataTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading BM typedefs", e); + } } - } - LOG.info("Refreshed cache successfully on all hosts :: traceId {}", traceId); - } - - private String executePost(String traceId, CloseableHttpClient client, HttpPost httpPost) throws IOException { - try (CloseableHttpResponse response = client.execute(httpPost)) { - LOG.info("Received HTTP response code {} from cache refresh endpoint :: traceId {}", response.getStatusLine().getStatusCode(), traceId); - if (response.getStatusLine().getStatusCode() != 200) { - throw new RuntimeException("Error while calling cache-refresher on host " + cacheRefresherEndpoint + ". HTTP code = " + response.getStatusLine().getStatusCode() + " :: traceId " + traceId); + )); + + typeDefMetadataMap.put(AtlasClassificationDef.class, new TypeDefMetadata( + TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, + "Classification", + AtlasTypeDefStoreInitializer::getCurrentClassificationTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentClassificationTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadClassificationMetadataTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Classification typedefs", e); + } } - return EntityUtils.toString(response.getEntity()); - } + )); + + typeDefMetadataMap.put(AtlasEnumDef.class, new TypeDefMetadata( + TYPEDEF_ENUM_CACHE_LATEST_VERSION, + "Enum", + AtlasTypeDefStoreInitializer::getCurrentEnumTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentEnumTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadEnumTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Enum typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasStructDef.class, new TypeDefMetadata( + TYPEDEF_STRUCT_CACHE_LATEST_VERSION, + "Struct", + AtlasTypeDefStoreInitializer::getCurrentStructTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentStructTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadStructTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Struct typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasEntityDef.class, new TypeDefMetadata( + TYPEDEF_ENTITY_CACHE_LATEST_VERSION, + "Entity", + AtlasTypeDefStoreInitializer::getCurrentEntityTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentEntityTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadEntityTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Entity typedefs", e); + } + } + )); + + typeDefMetadataMap.put(AtlasRelationshipDef.class, new TypeDefMetadata( + TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION, + "Relationship", + AtlasTypeDefStoreInitializer::getCurrentRelationshipTypedefInternalVersion, + AtlasTypeDefStoreInitializer::setCurrentRelationshipTypedefInternalVersion, + () -> { + try { + typeDefStore.reloadRelationshipTypeDefs(); + } catch (AtlasBaseException e) { + LOG.error("Error reloading Relationship typedefs", e); + } + } + )); } - private String executeGet(CloseableHttpClient client, HttpGet getRequest) throws IOException, AtlasBaseException { - try (CloseableHttpResponse closeableHttpResponse = client.execute(getRequest)) { - LOG.info("Received HTTP response code {} from cache refresh health endpoint", closeableHttpResponse.getStatusLine().getStatusCode()); - if (closeableHttpResponse.getStatusLine().getStatusCode() != 200) { - throw new AtlasBaseException(CINV_UNHEALTHY); + public void refreshCacheIfNeeded(RedisService redisService) throws AtlasBaseException { + for (TypeDefMetadata metadata : typeDefMetadataMap.values()) { + if (isTypeDefCacheRefreshNeeded(redisService, metadata)) { + LOG.info("Refreshing {} type-def cache as the version is different from latest", metadata.typeName); + metadata.reloadTypeDefs.run(); + long currentRedisVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")); + metadata.setCurrentVersion.accept(currentRedisVersion); } - return EntityUtils.toString(closeableHttpResponse.getEntity()); } } - private CacheRefreshResponseEnvelope convertStringToObject(final String responseBody) throws JsonProcessingException { - final ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(responseBody, CacheRefreshResponseEnvelope.class); + private boolean isTypeDefCacheRefreshNeeded(RedisService redisService, TypeDefMetadata metadata) { + long currentRedisVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")); + long currentInternalVersion = metadata.getCurrentVersion.get(); + LOG.info("Current Redis {} typedef version: {}, Latest {} typedef version: {}", + metadata.typeName, currentRedisVersion, metadata.typeName, currentInternalVersion); + return currentInternalVersion < currentRedisVersion; } -} -class CacheRefreshResponseEnvelope { - private List response; - - public List getResponse() { - return response; - } - - public void setResponse(List response) { - this.response = response; - } -} - -class CacheRefreshResponse { - private String host; - private int status; - private Map headers; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } + public void updateVersion(RedisService redisService, AtlasTypesDef atlasTypesDef) { + if (atlasTypesDef == null) { + return; + } - public int getStatus() { - return status; + updateTypeDefVersions(redisService, atlasTypesDef.getBusinessMetadataDefs(), AtlasBusinessMetadataDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getClassificationDefs(), AtlasClassificationDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getEnumDefs(), AtlasEnumDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getStructDefs(), AtlasStructDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getEntityDefs(), AtlasEntityDef.class); + updateTypeDefVersions(redisService, atlasTypesDef.getRelationshipDefs(), AtlasRelationshipDef.class); + } + + private void updateTypeDefVersions(RedisService redisService, + List typeDefs, + Class typeClass) { + if (CollectionUtils.isNotEmpty(typeDefs)) { + TypeDefMetadata metadata = typeDefMetadataMap.get(typeClass); + if (metadata != null) { + long latestVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(metadata.redisKey, latestVersionStr); + metadata.setCurrentVersion.accept(latestVersion); + } + } } - public void setStatus(int status) { - this.status = status; - } + public void updateVersion(RedisService redisService, AtlasBaseTypeDef atlasBaseTypeDef) { + if (atlasBaseTypeDef == null) { + return; + } - public Map getHeaders() { - return headers; + TypeDefMetadata metadata = typeDefMetadataMap.get(atlasBaseTypeDef.getClass()); + if (metadata != null) { + long latestVersion = Long.parseLong(redisService.getValue(metadata.redisKey, "1")) + 1; + String latestVersionStr = String.valueOf(latestVersion); + redisService.putValue(metadata.redisKey, latestVersionStr); + metadata.setCurrentVersion.accept(latestVersion); + } } - public void setHeaders(Map headers) { - this.headers = headers; - } } - -class CacheRefresherHealthResponse { - private String message; - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } -} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 2a867452b67..5ea59b12067 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -50,6 +50,7 @@ import org.apache.atlas.repository.patches.SuperTypesUpdatePatch; import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.patches.AtlasPatchRegistry; +import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; @@ -101,15 +102,23 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private final Configuration conf; private final AtlasGraph graph; private final AtlasPatchManager patchManager; + private final RedisService redisService; + private static long CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION; + private static long CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION; @Inject public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, - AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager) throws AtlasBaseException { + AtlasGraph graph, Configuration conf, AtlasPatchManager patchManager, RedisService redisService) throws AtlasBaseException { this.typeDefStore = typeDefStore; this.typeRegistry = typeRegistry; this.conf = conf; this.graph = graph; this.patchManager = patchManager; + this.redisService = redisService; } @PostConstruct @@ -118,6 +127,12 @@ public void init() { if (!HAConfiguration.isHAEnabled(conf)) { startInternal(); + CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENUM_CACHE_LATEST_VERSION, "1")); + CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_BUSINESS_METADATA_CACHE_LATEST_VERSION, "1")); + CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_CLASSIFICATION_METADATA_CACHE_LATEST_VERSION, "1")); + CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_STRUCT_CACHE_LATEST_VERSION, "1")); + CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_ENTITY_CACHE_LATEST_VERSION, "1")); + CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = Long.parseLong(redisService.getValue(Constants.TYPEDEF_RELATIONSHIP_CACHE_LATEST_VERSION, "1")); } else { LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation"); } @@ -407,6 +422,54 @@ public int getHandlerOrder() { return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder(); } + public static long getCurrentEnumTypedefInternalVersion() { + return CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentEnumTypedefInternalVersion(long version) { + CURRENT_ENUM_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentBMTypedefInternalVersion() { + return CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentBMTypedefInternalVersion(long version) { + CURRENT_BUSINESS_METADATA_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentClassificationTypedefInternalVersion() { + return CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentClassificationTypedefInternalVersion(long version) { + CURRENT_CLASSIFICATION_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentStructTypedefInternalVersion() { + return CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentStructTypedefInternalVersion(long version) { + CURRENT_STRUCT_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentEntityTypedefInternalVersion() { + return CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentEntityTypedefInternalVersion(long version) { + CURRENT_ENTITY_TYPEDEF_INTERNAL_VERSION = version; + } + + public static long getCurrentRelationshipTypedefInternalVersion() { + return CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION; + } + + public static void setCurrentRelationshipTypedefInternalVersion(long version) { + CURRENT_RELATIONSHIP_TYPEDEF_INTERNAL_VERSION = version; + } + private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index cdc06fbc08a..943336a47de 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -123,17 +123,135 @@ public void init() throws AtlasBaseException { } @Override - public void initWithoutLock() throws AtlasBaseException { - // need even better approach than this - AtlasTypesDef typesDef = new AtlasTypesDef(getEnumDefStore(typeRegistry).getAll(), - getStructDefStore(typeRegistry).getAll(), - getClassificationDefStore(typeRegistry).getAll(), - getEntityDefStore(typeRegistry).getAll(), - getRelationshipDefStore(typeRegistry).getAll(), - getBusinessMetadataDefStore(typeRegistry).getAll()); - - rectifyTypeErrorsIfAny(typesDef); - typeRegistry.addTypes(typesDef); + public void reloadEnumTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadEnumTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List enumDefs = getEnumDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllEnumDefs()) { + if (atlasBaseTypeDef instanceof AtlasEnumDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(enumDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadEnumTypeDefs()"); + } + } + + @Override + public void reloadStructTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadStructTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List structDefs = getStructDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllStructDefs()) { + if (atlasBaseTypeDef instanceof AtlasStructDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(structDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadStructTypeDefs()"); + } + } + + @Override + public void reloadEntityTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadEntityTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List entityDefs = getEntityDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllEntityDefs()) { + if (atlasBaseTypeDef instanceof AtlasEntityDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(entityDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadEntityTypeDefs()"); + } + } + + @Override + public void reloadRelationshipTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadRelationshipTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List relationshipDefs = getRelationshipDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllRelationshipDefs()) { + if (atlasBaseTypeDef instanceof AtlasRelationshipDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(relationshipDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadRelationshipTypeDefs()"); + } + } + + @Override + public void reloadBusinessMetadataTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List businessMetadataDefs = getBusinessMetadataDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllBusinessMetadataDefs()) { + if (atlasBaseTypeDef instanceof AtlasBusinessMetadataDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(businessMetadataDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadBusinessMetadataTypeDefs()"); + } + } + + @Override + public void reloadClassificationMetadataTypeDefs() throws AtlasBaseException { + LOG.info("==> AtlasTypeDefGraphStore.reloadClassificationMetadataTypeDefs()"); + AtlasTransientTypeRegistry ttr = null; + boolean commitUpdates = false; + + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + List classificationDefs = getClassificationDefStore(ttr).getAll(); + for (AtlasBaseTypeDef atlasBaseTypeDef : ttr.getAllClassificationDefs()) { + if (atlasBaseTypeDef instanceof AtlasClassificationDef) { + ttr.removeTypeByName(atlasBaseTypeDef.getName()); + } + } + ttr.addTypes(classificationDefs); + commitUpdates = true; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, commitUpdates); + LOG.info("<== AtlasTypeDefGraphStore.reloadClassificationMetadataTypeDefs()"); + } } @Override @@ -713,7 +831,7 @@ public void deleteTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException { @Override @GraphTransaction - public void deleteTypeByName(String typeName) throws AtlasBaseException { + public AtlasBaseTypeDef deleteTypeByName(String typeName) throws AtlasBaseException { AtlasType atlasType = typeRegistry.getType(typeName); if (atlasType == null) { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS.TYPE_NAME_NOT_FOUND, typeName); @@ -737,52 +855,58 @@ public void deleteTypeByName(String typeName) throws AtlasBaseException { } deleteTypesDef(typesDef); + return baseTypeDef; } @Override public AtlasTypesDef searchTypesDef(SearchFilter searchFilter) throws AtlasBaseException { final AtlasTypesDef typesDef = new AtlasTypesDef(); Predicate searchPredicates = FilterUtil.getPredicateFromSearchFilter(searchFilter); - - for(AtlasEnumType enumType : typeRegistry.getAllEnumTypes()) { - if (searchPredicates.evaluate(enumType)) { - typesDef.getEnumDefs().add(enumType.getEnumDef()); + AtlasTransientTypeRegistry ttr = null; + try { + ttr = typeRegistry.lockTypeRegistryForUpdate(5); + for (AtlasEnumType enumType : ttr.getAllEnumTypes()) { + if (searchPredicates.evaluate(enumType)) { + typesDef.getEnumDefs().add(enumType.getEnumDef()); + } } - } - for(AtlasStructType structType : typeRegistry.getAllStructTypes()) { - if (searchPredicates.evaluate(structType)) { - typesDef.getStructDefs().add(structType.getStructDef()); + for (AtlasStructType structType : ttr.getAllStructTypes()) { + if (searchPredicates.evaluate(structType)) { + typesDef.getStructDefs().add(structType.getStructDef()); + } } - } - for(AtlasClassificationType classificationType : typeRegistry.getAllClassificationTypes()) { - if (searchPredicates.evaluate(classificationType)) { - typesDef.getClassificationDefs().add(classificationType.getClassificationDef()); + for (AtlasClassificationType classificationType : ttr.getAllClassificationTypes()) { + if (searchPredicates.evaluate(classificationType)) { + typesDef.getClassificationDefs().add(classificationType.getClassificationDef()); + } } - } - for(AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) { - if (searchPredicates.evaluate(entityType)) { - typesDef.getEntityDefs().add(entityType.getEntityDef()); + for (AtlasEntityType entityType : ttr.getAllEntityTypes()) { + if (searchPredicates.evaluate(entityType)) { + typesDef.getEntityDefs().add(entityType.getEntityDef()); + } } - } - for(AtlasRelationshipType relationshipType : typeRegistry.getAllRelationshipTypes()) { - if (searchPredicates.evaluate(relationshipType)) { - typesDef.getRelationshipDefs().add(relationshipType.getRelationshipDef()); + for (AtlasRelationshipType relationshipType : ttr.getAllRelationshipTypes()) { + if (searchPredicates.evaluate(relationshipType)) { + typesDef.getRelationshipDefs().add(relationshipType.getRelationshipDef()); + } } - } - for(AtlasBusinessMetadataType businessMetadataType : typeRegistry.getAllBusinessMetadataTypes()) { - if (searchPredicates.evaluate(businessMetadataType)) { - typesDef.getBusinessMetadataDefs().add(businessMetadataType.getBusinessMetadataDef()); + for (AtlasBusinessMetadataType businessMetadataType : ttr.getAllBusinessMetadataTypes()) { + if (searchPredicates.evaluate(businessMetadataType)) { + typesDef.getBusinessMetadataDefs().add(businessMetadataType.getBusinessMetadataDef()); + } } - } - AtlasAuthorizationUtils.filterTypesDef(new AtlasTypesDefFilterRequest(typesDef)); + AtlasAuthorizationUtils.filterTypesDef(new AtlasTypesDefFilterRequest(typesDef)); - return typesDef; + return typesDef; + } finally { + typeRegistry.releaseTypeRegistryForUpdate(ttr, false); + } } @Override diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java index 3d8ae59ed07..97a05a4aa8f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/SoftDeleteHandlerV1.java @@ -39,9 +39,7 @@ import java.util.Collection; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; -import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.GraphHelper.*; public class SoftDeleteHandlerV1 extends DeleteHandlerV1 { @@ -77,8 +75,15 @@ protected void deleteEdge(AtlasEdge edge, boolean force) throws AtlasBaseExcepti LOG.debug("==> SoftDeleteHandlerV1.deleteEdge({}, {})", GraphHelper.string(edge), force); } - if (edge == null || getTypeName(edge) == null) { - LOG.warn("Edge is null or its typeName is empty. Nothing to delete"); + if (edge == null) { + LOG.warn("Edge is null. Nothing to delete"); + return; + } + + //tag vertex do not have typeName, but they have a label + if (!CLASSIFICATION_LABEL.equalsIgnoreCase(edge.getLabel()) + && getTypeName(edge) == null) { + LOG.warn("Edge is not a tag type and typeName is empty. Nothing to delete"); return; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java index 4cc36109f99..35d8255eb90 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasTypeDefGraphStoreV2.java @@ -22,9 +22,12 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.authorize.AtlasTypesDefFilterRequest; +import org.apache.atlas.authorizer.AtlasAuthorizationUtils; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.TypeDefChangeListener; +import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.typedef.*; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graph.GraphHelper; @@ -34,11 +37,11 @@ import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasDefStore; import org.apache.atlas.repository.store.graph.AtlasTypeDefGraphStore; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.repository.util.FilterUtil; +import org.apache.atlas.type.*; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Predicate; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,16 +126,6 @@ public void init() throws AtlasBaseException { LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); } - @Override - @GraphTransaction - public void initWithoutLock() throws AtlasBaseException { - LOG.info("==> AtlasTypeDefGraphStoreV1.initWithoutLock()"); - - super.initWithoutLock(); - - LOG.info("<== AtlasTypeDefGraphStoreV1.initWithoutLock()"); - } - AtlasGraph getAtlasGraph() { return atlasGraph; } @VisibleForTesting diff --git a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java index 72ada05a786..990a637d2fa 100644 --- a/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java +++ b/repository/src/main/java/org/apache/atlas/store/AtlasTypeDefStore.java @@ -35,7 +35,17 @@ public interface AtlasTypeDefStore { void init() throws AtlasBaseException; - void initWithoutLock() throws AtlasBaseException; + void reloadEnumTypeDefs() throws AtlasBaseException; + + void reloadBusinessMetadataTypeDefs() throws AtlasBaseException; + + void reloadClassificationMetadataTypeDefs() throws AtlasBaseException; + + void reloadStructTypeDefs() throws AtlasBaseException; + + void reloadEntityTypeDefs() throws AtlasBaseException; + + void reloadRelationshipTypeDefs() throws AtlasBaseException; /* EnumDef operations */ @@ -112,7 +122,7 @@ AtlasClassificationDef updateClassificationDefByGuid(String guid, AtlasClassific AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException; - void deleteTypeByName(String typeName) throws AtlasBaseException; + AtlasBaseTypeDef deleteTypeByName(String typeName) throws AtlasBaseException; void notifyLoadCompletion(); } diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java index efccf0828d5..3654fca5029 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypeCacheRefreshREST.java @@ -72,12 +72,12 @@ private synchronized void refreshTypeDef(int expectedFieldKeys,final String trac int currentSize = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); LOG.info("Size of field keys before refresh = {} :: traceId {}", currentSize,traceId); - long totalWaitTimeInMillis = 15 * 1000;//15 seconds + long totalWaitTimeInMillis = 5 * 1000;//5 seconds long sleepTimeInMillis = 500; - long totalIterationsAllowed = Math.floorDiv(totalWaitTimeInMillis, sleepTimeInMillis); + long totalIterationsAllowed = 5; int counter = 0; - while (currentSize != expectedFieldKeys && counter++ < totalIterationsAllowed) { + while (currentSize < expectedFieldKeys && counter++ < totalIterationsAllowed) { currentSize = provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size(); LOG.info("field keys size found = {} at iteration {} :: traceId {}", currentSize, counter, traceId); Thread.sleep(sleepTimeInMillis); @@ -91,7 +91,7 @@ private synchronized void refreshTypeDef(int expectedFieldKeys,final String trac LOG.info("Found desired size of fieldKeys in iteration {} :: traceId {}", counter, traceId); } //Reload in-memory cache of type-registry - typeDefStore.initWithoutLock(); + typeDefStore.init(); LOG.info("Size of field keys after refresh = {}", provider.get().getManagementSystem().getGraphIndex(VERTEX_INDEX).getFieldKeys().size()); LOG.info("Completed type-def cache refresh :: traceId {}", traceId); diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index 89caf89f497..1c07e0becea 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -25,8 +25,8 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.SearchFilter; import org.apache.atlas.model.typedef.*; -import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graph.TypeCacheRefresher; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.util.FilterUtil; import org.apache.atlas.service.redis.RedisService; import org.apache.atlas.store.AtlasTypeDefStore; @@ -47,8 +47,6 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -60,7 +58,6 @@ import static org.apache.atlas.AtlasErrorCode.APPLICABLE_ENTITY_TYPES_DELETION_NOT_SUPPORTED; import static org.apache.atlas.AtlasErrorCode.ATTRIBUTE_DELETION_NOT_SUPPORTED; -import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchQuery.CLIENT_ORIGIN_PRODUCT; import static org.apache.atlas.web.filters.AuditFilter.X_ATLAN_CLIENT_ORIGIN; /** @@ -114,7 +111,7 @@ public TypesREST(AtlasTypeDefStore typeDefStore, RedisService redisService, Conf @Timed public AtlasBaseTypeDef getTypeDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBaseTypeDef ret = typeDefStore.getByName(name); return ret; @@ -132,7 +129,7 @@ public AtlasBaseTypeDef getTypeDefByName(@PathParam("name") String name) throws @Timed public AtlasBaseTypeDef getTypeDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBaseTypeDef ret = typeDefStore.getByGuid(guid); return ret; @@ -150,7 +147,7 @@ public AtlasBaseTypeDef getTypeDefByGuid(@PathParam("guid") String guid) throws @Timed public List getTypeDefHeaders(@Context HttpServletRequest httpServletRequest) throws AtlasBaseException { SearchFilter searchFilter = getSearchFilter(httpServletRequest); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasTypesDef searchTypesDef = typeDefStore.searchTypesDef(searchFilter); return AtlasTypeUtil.toTypeDefHeader(searchTypesDef); @@ -167,7 +164,7 @@ public List getTypeDefHeaders(@Context HttpServletRequest ht @Timed public AtlasTypesDef getAllTypeDefs(@Context HttpServletRequest httpServletRequest) throws AtlasBaseException { SearchFilter searchFilter = getSearchFilter(httpServletRequest); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasTypesDef typesDef = typeDefStore.searchTypesDef(searchFilter); return typesDef; @@ -186,7 +183,6 @@ public AtlasTypesDef getAllTypeDefs(@Context HttpServletRequest httpServletReque @Timed public AtlasEnumDef getEnumDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasEnumDef ret = typeDefStore.getEnumDefByName(name); return ret; @@ -205,7 +201,6 @@ public AtlasEnumDef getEnumDefByName(@PathParam("name") String name) throws Atla @Timed public AtlasEnumDef getEnumDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasEnumDef ret = typeDefStore.getEnumDefByGuid(guid); return ret; @@ -225,7 +220,6 @@ public AtlasEnumDef getEnumDefByGuid(@PathParam("guid") String guid) throws Atla @Timed public AtlasStructDef getStructDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasStructDef ret = typeDefStore.getStructDefByName(name); return ret; @@ -244,7 +238,6 @@ public AtlasStructDef getStructDefByName(@PathParam("name") String name) throws @Timed public AtlasStructDef getStructDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasStructDef ret = typeDefStore.getStructDefByGuid(guid); return ret; @@ -263,7 +256,7 @@ public AtlasStructDef getStructDefByGuid(@PathParam("guid") String guid) throws @Timed public AtlasClassificationDef getClassificationDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasClassificationDef ret = typeDefStore.getClassificationDefByName(name); return ret; @@ -282,7 +275,7 @@ public AtlasClassificationDef getClassificationDefByName(@PathParam("name") Stri @Timed public AtlasClassificationDef getClassificationDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasClassificationDef ret = typeDefStore.getClassificationDefByGuid(guid); return ret; @@ -301,7 +294,6 @@ public AtlasClassificationDef getClassificationDefByGuid(@PathParam("guid") Stri @Timed public AtlasEntityDef getEntityDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasEntityDef ret = typeDefStore.getEntityDefByName(name); return ret; @@ -320,7 +312,6 @@ public AtlasEntityDef getEntityDefByName(@PathParam("name") String name) throws @Timed public AtlasEntityDef getEntityDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasEntityDef ret = typeDefStore.getEntityDefByGuid(guid); return ret; @@ -338,7 +329,6 @@ public AtlasEntityDef getEntityDefByGuid(@PathParam("guid") String guid) throws @Timed public AtlasRelationshipDef getRelationshipDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - AtlasRelationshipDef ret = typeDefStore.getRelationshipDefByName(name); return ret; @@ -357,7 +347,6 @@ public AtlasRelationshipDef getRelationshipDefByName(@PathParam("name") String n @Timed public AtlasRelationshipDef getRelationshipDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - AtlasRelationshipDef ret = typeDefStore.getRelationshipDefByGuid(guid); return ret; @@ -376,7 +365,7 @@ public AtlasRelationshipDef getRelationshipDefByGuid(@PathParam("guid") String g @Timed public AtlasBusinessMetadataDef getBusinessMetadataDefByGuid(@PathParam("guid") String guid) throws AtlasBaseException { Servlets.validateQueryParamLength("guid", guid); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBusinessMetadataDef ret = typeDefStore.getBusinessMetadataDefByGuid(guid); return ret; @@ -395,28 +384,12 @@ public AtlasBusinessMetadataDef getBusinessMetadataDefByGuid(@PathParam("guid") @Timed public AtlasBusinessMetadataDef getBusinessMetadataDefByName(@PathParam("name") String name) throws AtlasBaseException { Servlets.validateQueryParamLength("name", name); - + typeCacheRefresher.refreshCacheIfNeeded(redisService); AtlasBusinessMetadataDef ret = typeDefStore.getBusinessMetadataDefByName(name); return ret; } - private void attemptAcquiringLock() throws AtlasBaseException { - final String traceId = RequestContext.get().getTraceId(); - try { - if (!redisService.acquireDistributedLock(typeDefLock)) { - LOG.info("Lock is already acquired. Returning now :: traceId {}", traceId); - throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK); - } - LOG.info("successfully acquired lock :: traceId {}", traceId); - } catch (AtlasBaseException e) { - throw e; - } catch (Exception e) { - LOG.error("Error while acquiring lock on type-defs :: traceId " + traceId + " ." + e.getMessage(), e); - throw new AtlasBaseException("Error while acquiring a lock on type-defs"); - } - } - private Lock attemptAcquiringLockV2() throws AtlasBaseException { final String traceId = RequestContext.get().getTraceId(); Lock lock = null; @@ -459,8 +432,9 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ AtlasPerfTracer perf = null; validateBuiltInTypeNames(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); + AtlasTypesDef atlasTypesDef = null; try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.createAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -470,7 +444,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ typesDef.getBusinessMetadataDefs().forEach(AtlasBusinessMetadataDef::setRandomNameForEntityAndAttributeDefs); typesDef.getClassificationDefs().forEach(AtlasClassificationDef::setRandomNameForEntityAndAttributeDefs); String clientOrigin = servletRequest.getHeader(X_ATLAN_CLIENT_ORIGIN); - AtlasTypesDef atlasTypesDef = createTypeDefsWithRetry(typesDef, clientOrigin); + atlasTypesDef = createTypeDefsWithRetry(typesDef, clientOrigin); return atlasTypesDef; } catch (AtlasBaseException atlasBaseException) { LOG.error("TypesREST.createAtlasTypeDefs:: " + atlasBaseException.getMessage(), atlasBaseException); @@ -481,6 +455,7 @@ public AtlasTypesDef createAtlasTypeDefs(@Context HttpServletRequest servletRequ } finally { if (lock != null) { + typeCacheRefresher.updateVersion(redisService, atlasTypesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -503,11 +478,11 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ @QueryParam("allowDuplicateDisplayName") @DefaultValue("false") boolean allowDuplicateDisplayName) throws AtlasBaseException { AtlasPerfTracer perf = null; validateBuiltInTypeNames(typesDef); - validateTypeNameExists(typesDef); RequestContext.get().setTraceId(UUID.randomUUID().toString()); Lock lock = null; + AtlasTypesDef atlasTypesDef = null; try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.updateAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -537,7 +512,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ RequestContext.get().setAllowDuplicateDisplayName(allowDuplicateDisplayName); LOG.info("TypesRest.updateAtlasTypeDefs:: Typedef patch enabled:" + patch); String clientOrigin = servletRequest.getHeader(X_ATLAN_CLIENT_ORIGIN); - AtlasTypesDef atlasTypesDef = updateTypeDefsWithRetry(typesDef, clientOrigin); + atlasTypesDef = updateTypeDefsWithRetry(typesDef, clientOrigin); LOG.info("TypesRest.updateAtlasTypeDefs:: Done"); return atlasTypesDef; } catch (AtlasBaseException atlasBaseException) { @@ -548,6 +523,7 @@ public AtlasTypesDef updateAtlasTypeDefs(@Context HttpServletRequest servletRequ throw new AtlasBaseException("Error while updating a type definition"); } finally { if (lock != null) { + typeCacheRefresher.updateVersion(redisService, atlasTypesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } RequestContext.clear(); @@ -571,9 +547,8 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina Lock lock = null; RequestContext.get().setTraceId(UUID.randomUUID().toString()); validateBuiltInTypeNames(typesDef); - validateTypeNameExists(typesDef); try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.deleteAtlasTypeDefs(" + AtlasTypeUtil.toDebugString(typesDef) + ")"); @@ -589,6 +564,7 @@ public void deleteAtlasTypeDefs(@Context HttpServletRequest servletRequest, fina throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { + typeCacheRefresher.updateVersion(redisService, typesDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -609,13 +585,14 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) AtlasPerfTracer perf = null; Lock lock = null; RequestContext.get().setTraceId(UUID.randomUUID().toString()); + AtlasBaseTypeDef atlasBaseTypeDef = null; try { - typeCacheRefresher.verifyCacheRefresherHealth(); + typeCacheRefresher.refreshCacheIfNeeded(redisService); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TypesREST.deleteAtlasTypeByName(" + typeName + ")"); } lock = attemptAcquiringLockV2(); - deleteTypeByNameWithRetry(typeName); + atlasBaseTypeDef = deleteTypeByNameWithRetry(typeName); } catch (AtlasBaseException atlasBaseException) { LOG.error("TypesREST.deleteAtlasTypeByName:: " + atlasBaseException.getMessage(), atlasBaseException); throw atlasBaseException; @@ -624,6 +601,7 @@ public void deleteAtlasTypeByName(@PathParam("typeName") final String typeName) throw new AtlasBaseException("Error while deleting a type definition"); } finally { if (lock != null) { + typeCacheRefresher.updateVersion(redisService, atlasBaseTypeDef); redisService.releaseDistributedLockV2(lock, typeDefLock); } AtlasPerfTracer.log(perf); @@ -648,45 +626,6 @@ private void validateBuiltInTypeNames(AtlasTypesDef typesDef) throws AtlasBaseEx } } - private void validateTypeNames(AtlasTypesDef typesDef) throws AtlasBaseException { - if (CollectionUtils.isNotEmpty(typesDef.getEnumDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getEnumDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getEntityDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getEntityDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getStructDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getStructDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getClassificationDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getClassificationDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - if (CollectionUtils.isNotEmpty(typesDef.getBusinessMetadataDefs())) { - for (AtlasBaseTypeDef typeDef : typesDef.getBusinessMetadataDefs()) { - typeDefStore.getByName(typeDef.getName()); - } - } - } - - private void validateTypeNameExists(AtlasTypesDef typesDef) throws AtlasBaseException { - try { - validateTypeNames(typesDef); - } catch (AtlasBaseException e) { - if(AtlasErrorCode.TYPE_NAME_NOT_FOUND.equals(e.getAtlasErrorCode())) { - typeDefStore.initWithoutLock(); - validateTypeNames(typesDef); - } - } - } - /** * Populate a SearchFilter on the basis of the Query Parameters * @return @@ -717,53 +656,6 @@ private SearchFilter getSearchFilter(HttpServletRequest httpServletRequest) { return ret; } - private void refreshAllHostCache(String traceId, String clientOrigin){ - if (CLIENT_ORIGIN_PRODUCT.equals(clientOrigin) && AtlasConfiguration.ENABLE_ASYNC_TYPE_UPDATE.getBoolean()){ - cacheRefreshExecutor.submit(() -> { - RequestContext.get().setTraceId(traceId); - try { - int maxRetries = 5; - int retryCount = 0; - boolean success = false; - - while (!success && retryCount < maxRetries) { - try { - typeCacheRefresher.refreshAllHostCache(); - LOG.info("TypesRest.updateAtlasTypeDefs:: Typedef refreshed successfully on attempt {}", retryCount + 1); - success = true; - } catch (IOException | URISyntaxException | RepositoryException e) { - retryCount++; - if (retryCount >= maxRetries) { - LOG.error("TypesRest.updateAtlasTypeDefs:: Failed to refresh typedef after {} attempts", maxRetries, e); - break; - } - - // Exponential backoff: wait longer between each retry - long waitTimeMs = 1000 * (long)Math.pow(2, retryCount - 1); - LOG.warn("TypesRest.updateAtlasTypeDefs:: Retry attempt {} after {} ms", retryCount, waitTimeMs); - - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Retry interrupted", ie); - break; - } - } - } - } catch (Exception e) { - LOG.error("Unexpected error in async cache refresh", e); - } - }); - } else { - try { - typeCacheRefresher.refreshAllHostCache(); - } catch (IOException | URISyntaxException | RepositoryException e) { - LOG.error("Error while refreshing all host cache", e); - } - } - } - @PreDestroy public void cleanUp() { if (cacheRefreshExecutor != null) { @@ -794,7 +686,6 @@ private AtlasTypesDef createTypeDefsWithRetry(AtlasTypesDef typesDef, String cli // Perform the creation AtlasTypesDef result = typeDefStore.createTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully created typedefs on attempt {}", attempt); return result; @@ -841,7 +732,6 @@ private AtlasTypesDef updateTypeDefsWithRetry(AtlasTypesDef typesDef, String cli } AtlasTypesDef result = typeDefStore.updateTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully updated typedefs on attempt {}", attempt); return result; @@ -883,7 +773,6 @@ private void deleteTypeDefsWithRetry(AtlasTypesDef typesDef, String clientOrigin try { // Perform the deletion typeDefStore.deleteTypesDef(typesDef); - refreshAllHostCache(RequestContext.get().getTraceId(), clientOrigin); LOG.info("Successfully deleted typedefs on attempt {}", attempt); return; @@ -915,16 +804,15 @@ private void deleteTypeDefsWithRetry(AtlasTypesDef typesDef, String clientOrigin throw lastException; // Should never reach here, but for completeness } - private void deleteTypeByNameWithRetry(String typeName) throws AtlasBaseException { + private AtlasBaseTypeDef deleteTypeByNameWithRetry(String typeName) throws AtlasBaseException { AtlasBaseException lastException = null; for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { try { // Perform the deletion - typeDefStore.deleteTypeByName(typeName); - refreshAllHostCache(RequestContext.get().getTraceId(), null); + AtlasBaseTypeDef atlasBaseTypeDef = typeDefStore.deleteTypeByName(typeName); LOG.info("Successfully deleted typedef '{}' on attempt {}", typeName, attempt); - return; + return atlasBaseTypeDef; } catch (AtlasBaseException e) { lastException = e;