diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java index 036681eb6cb..a33733641d7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/ESBasedAuditRepository.java @@ -65,6 +65,7 @@ import static java.nio.charset.Charset.defaultCharset; import static org.apache.atlas.repository.Constants.DOMAIN_GUIDS; +import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.INDEX_BACKEND_CONF; import static org.springframework.util.StreamUtils.copyToString; /** @@ -76,7 +77,7 @@ @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepositorySearch.impl") public class ESBasedAuditRepository extends AbstractStorageBasedAuditRepository { private static final Logger LOG = LoggerFactory.getLogger(ESBasedAuditRepository.class); - public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.hostname"; + public static final String INDEX_WRITE_BACKEND_CONF = "atlas.graph.index.search.write.hostname"; private static final String TOTAL_FIELD_LIMIT = "atlas.index.audit.elasticsearch.total_field_limit"; public static final String INDEX_NAME = "entity_audits"; private static final String ENTITYID = "entityId"; @@ -547,8 +548,7 @@ private void setLowLevelClient() throws AtlasException { public static List getHttpHosts() throws AtlasException { List httpHosts = new ArrayList<>(); - Configuration configuration = ApplicationProperties.get(); - String indexConf = configuration.getString(INDEX_BACKEND_CONF); + String indexConf = getESHosts(); String[] hosts = indexConf.split(","); for (String host : hosts) { host = host.trim(); @@ -564,6 +564,17 @@ public static List getHttpHosts() throws AtlasException { return httpHosts; } + public static String getESHosts() throws AtlasException { + Configuration configuration = ApplicationProperties.get(); + //get es write hosts if available (ES Isolation) + String esHostNames = configuration.getString(INDEX_WRITE_BACKEND_CONF); + if (StringUtils.isNotEmpty(esHostNames)) { + return esHostNames; + } else { + return configuration.getString(INDEX_BACKEND_CONF); + } + } + private boolean isSuccess(Response response) { return response.getStatusLine().getStatusCode() == 200; } diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java index 411a1b1c70f..eafe0d6d5e2 100755 --- a/webapp/src/main/java/org/apache/atlas/Atlas.java +++ b/webapp/src/main/java/org/apache/atlas/Atlas.java @@ -321,7 +321,7 @@ private static void installLogBridge() { SLF4JBridgeHandler.install(); } - private static void initAccessAuditElasticSearch(Configuration configuration) throws IOException { + private static void initAccessAuditElasticSearch(Configuration configuration) throws IOException, AtlasException { AccessAuditLogsIndexCreator indexCreator = new AccessAuditLogsIndexCreator(configuration); indexCreator.start(); } diff --git a/webapp/src/main/java/org/apache/atlas/util/AccessAuditLogsIndexCreator.java b/webapp/src/main/java/org/apache/atlas/util/AccessAuditLogsIndexCreator.java index 3f005421202..a570604a211 100644 --- a/webapp/src/main/java/org/apache/atlas/util/AccessAuditLogsIndexCreator.java +++ b/webapp/src/main/java/org/apache/atlas/util/AccessAuditLogsIndexCreator.java @@ -17,13 +17,12 @@ package org.apache.atlas.util; import org.apache.atlas.AtlasConfiguration; -import org.apache.atlas.audit.utils.CredentialsProviderUtil; +import org.apache.atlas.AtlasException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.elasticsearch.client.Request; @@ -42,11 +41,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; -import java.util.StringTokenizer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.atlas.repository.audit.ESBasedAuditRepository.INDEX_BACKEND_CONF; +import static org.apache.atlas.repository.audit.ESBasedAuditRepository.INDEX_WRITE_BACKEND_CONF; +import static org.apache.atlas.repository.audit.ESBasedAuditRepository.getESHosts; public class AccessAuditLogsIndexCreator extends Thread { private static final Logger LOG = LoggerFactory.getLogger(AccessAuditLogsIndexCreator.class); @@ -88,13 +87,13 @@ public class AccessAuditLogsIndexCreator extends Thread { private int no_of_shards; private boolean is_completed = false; - public AccessAuditLogsIndexCreator(Configuration configuration) throws IOException { + public AccessAuditLogsIndexCreator(Configuration configuration) throws IOException, AtlasException { LOG.debug("Starting Ranger audit schema setup in ElasticSearch."); time_interval = configuration.getLong(ES_TIME_INTERVAL, DEFAULT_ES_TIME_INTERVAL_MS); user = configuration.getString(ES_CONFIG_USERNAME); - hosts = getHttpHosts(configuration); - port = getPort(configuration); + hosts = getHttpHosts(); + port = getPort(); protocol = configuration.getString(ES_CONFIG_PROTOCOL, "http"); index = configuration.getString(ES_CONFIG_INDEX, DEFAULT_INDEX_NAME); @@ -156,7 +155,7 @@ public void run() { } } } else { - LOG.error("elasticsearch hosts values are empty. Please set property " + INDEX_BACKEND_CONF); + LOG.error("elasticsearch hosts values are empty. Please set property "+ INDEX_WRITE_BACKEND_CONF); } } @@ -198,28 +197,6 @@ private void createClient() { } } - public static RestClientBuilder getRestClientBuilder(String urls, String protocol, String user, String password, int port) { - RestClientBuilder restClientBuilder = RestClient.builder( - toArray(urls, ",").stream() - .map(x -> new HttpHost(x, port, protocol)) - .toArray(i -> new HttpHost[i]) - ); - if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) { - - final CredentialsProvider credentialsProvider = - CredentialsProviderUtil.getBasicCredentials(user, password); - restClientBuilder.setHttpClientConfigCallback(clientBuilder -> - clientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - - } else { - LOG.error("ElasticSearch Credentials not provided!!"); - final CredentialsProvider credentialsProvider = null; - restClientBuilder.setHttpClientConfigCallback(clientBuilder -> - clientBuilder.setDefaultCredentialsProvider(credentialsProvider)); - } - return restClientBuilder; - } - private boolean createIndex() { boolean exists = false; if (client == null) { @@ -295,23 +272,10 @@ private void logErrorMessageAndWait(String msg, Exception exception) { } } - private static String getHosts(Configuration configuration) { - StringBuilder urls = new StringBuilder(); - String indexConf = configuration.getString(INDEX_BACKEND_CONF); - String[] hosts = indexConf.split(","); - for (String host : hosts) { - host = host.trim(); - String[] hostAndPort = host.split(":"); - urls.append(hostAndPort[0]); - - } - return urls.toString(); - } - - public static List getHttpHosts(Configuration configuration) { + public static List getHttpHosts() throws AtlasException { List httpHosts = new ArrayList<>(); - String indexConf = configuration.getString(INDEX_BACKEND_CONF); + String indexConf = getESHosts(); String[] hosts = indexConf.split(","); for (String host : hosts) { host = host.trim(); @@ -326,10 +290,9 @@ public static List getHttpHosts(Configuration configuration) { } - private static int getPort(Configuration configuration) { + private static int getPort() throws AtlasException { int port = 9200; - StringBuilder urls = new StringBuilder(); - String indexConf = configuration.getString(INDEX_BACKEND_CONF); + String indexConf = getESHosts(); try { String[] hosts = indexConf.split(","); String host = hosts[0]; @@ -342,15 +305,4 @@ private static int getPort(Configuration configuration) { return port; } - - public static List toArray(String destListStr, String delim) { - List list = new ArrayList(); - if (StringUtils.isNotBlank(destListStr)) { - StringTokenizer tokenizer = new StringTokenizer(destListStr, delim.trim()); - while (tokenizer.hasMoreTokens()) { - list.add(tokenizer.nextToken()); - } - } - return list; - } }