Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import static java.nio.charset.Charset.defaultCharset;
import static org.apache.atlas.repository.Constants.DOMAIN_GUIDS;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.INDEX_BACKEND_CONF;
import static org.springframework.util.StreamUtils.copyToString;

/**
Expand All @@ -76,7 +77,7 @@
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepositorySearch.impl")
public class ESBasedAuditRepository extends AbstractStorageBasedAuditRepository {
private static final Logger LOG = LoggerFactory.getLogger(ESBasedAuditRepository.class);
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.hostname";
public static final String INDEX_WRITE_BACKEND_CONF = "atlas.graph.index.search.write.hostname";
private static final String TOTAL_FIELD_LIMIT = "atlas.index.audit.elasticsearch.total_field_limit";
public static final String INDEX_NAME = "entity_audits";
private static final String ENTITYID = "entityId";
Expand Down Expand Up @@ -547,8 +548,7 @@ private void setLowLevelClient() throws AtlasException {

public static List<HttpHost> getHttpHosts() throws AtlasException {
List<HttpHost> httpHosts = new ArrayList<>();
Configuration configuration = ApplicationProperties.get();
String indexConf = configuration.getString(INDEX_BACKEND_CONF);
String indexConf = getESHosts();
String[] hosts = indexConf.split(",");
for (String host : hosts) {
host = host.trim();
Expand All @@ -564,6 +564,17 @@ public static List<HttpHost> getHttpHosts() throws AtlasException {
return httpHosts;
}

public static String getESHosts() throws AtlasException {
Configuration configuration = ApplicationProperties.get();
//get es write hosts if available (ES Isolation)
String esHostNames = configuration.getString(INDEX_WRITE_BACKEND_CONF);
if (StringUtils.isNotEmpty(esHostNames)) {
return esHostNames;
} else {
return configuration.getString(INDEX_BACKEND_CONF);
}
}

private boolean isSuccess(Response response) {
return response.getStatusLine().getStatusCode() == 200;
}
Expand Down
2 changes: 1 addition & 1 deletion webapp/src/main/java/org/apache/atlas/Atlas.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private static void installLogBridge() {
SLF4JBridgeHandler.install();
}

private static void initAccessAuditElasticSearch(Configuration configuration) throws IOException {
private static void initAccessAuditElasticSearch(Configuration configuration) throws IOException, AtlasException {
AccessAuditLogsIndexCreator indexCreator = new AccessAuditLogsIndexCreator(configuration);
indexCreator.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package org.apache.atlas.util;

import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.audit.utils.CredentialsProviderUtil;
import org.apache.atlas.AtlasException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
Expand All @@ -42,11 +41,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.atlas.repository.audit.ESBasedAuditRepository.INDEX_BACKEND_CONF;
import static org.apache.atlas.repository.audit.ESBasedAuditRepository.INDEX_WRITE_BACKEND_CONF;
import static org.apache.atlas.repository.audit.ESBasedAuditRepository.getESHosts;

public class AccessAuditLogsIndexCreator extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(AccessAuditLogsIndexCreator.class);
Expand Down Expand Up @@ -88,13 +87,13 @@ public class AccessAuditLogsIndexCreator extends Thread {
private int no_of_shards;
private boolean is_completed = false;

public AccessAuditLogsIndexCreator(Configuration configuration) throws IOException {
public AccessAuditLogsIndexCreator(Configuration configuration) throws IOException, AtlasException {
LOG.debug("Starting Ranger audit schema setup in ElasticSearch.");
time_interval = configuration.getLong(ES_TIME_INTERVAL, DEFAULT_ES_TIME_INTERVAL_MS);
user = configuration.getString(ES_CONFIG_USERNAME);

hosts = getHttpHosts(configuration);
port = getPort(configuration);
hosts = getHttpHosts();
port = getPort();

protocol = configuration.getString(ES_CONFIG_PROTOCOL, "http");
index = configuration.getString(ES_CONFIG_INDEX, DEFAULT_INDEX_NAME);
Expand Down Expand Up @@ -156,7 +155,7 @@ public void run() {
}
}
} else {
LOG.error("elasticsearch hosts values are empty. Please set property " + INDEX_BACKEND_CONF);
LOG.error("elasticsearch hosts values are empty. Please set property "+ INDEX_WRITE_BACKEND_CONF);
}

}
Expand Down Expand Up @@ -198,28 +197,6 @@ private void createClient() {
}
}

public static RestClientBuilder getRestClientBuilder(String urls, String protocol, String user, String password, int port) {
RestClientBuilder restClientBuilder = RestClient.builder(
toArray(urls, ",").stream()
.map(x -> new HttpHost(x, port, protocol))
.<HttpHost>toArray(i -> new HttpHost[i])
);
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {

final CredentialsProvider credentialsProvider =
CredentialsProviderUtil.getBasicCredentials(user, password);
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));

} else {
LOG.error("ElasticSearch Credentials not provided!!");
final CredentialsProvider credentialsProvider = null;
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return restClientBuilder;
}

private boolean createIndex() {
boolean exists = false;
if (client == null) {
Expand Down Expand Up @@ -295,23 +272,10 @@ private void logErrorMessageAndWait(String msg, Exception exception) {
}
}

private static String getHosts(Configuration configuration) {
StringBuilder urls = new StringBuilder();
String indexConf = configuration.getString(INDEX_BACKEND_CONF);
String[] hosts = indexConf.split(",");
for (String host : hosts) {
host = host.trim();
String[] hostAndPort = host.split(":");
urls.append(hostAndPort[0]);

}
return urls.toString();
}

public static List<HttpHost> getHttpHosts(Configuration configuration) {
public static List<HttpHost> getHttpHosts() throws AtlasException {
List<HttpHost> httpHosts = new ArrayList<>();

String indexConf = configuration.getString(INDEX_BACKEND_CONF);
String indexConf = getESHosts();
String[] hosts = indexConf.split(",");
for (String host : hosts) {
host = host.trim();
Expand All @@ -326,10 +290,9 @@ public static List<HttpHost> getHttpHosts(Configuration configuration) {
}


private static int getPort(Configuration configuration) {
private static int getPort() throws AtlasException {
int port = 9200;
StringBuilder urls = new StringBuilder();
String indexConf = configuration.getString(INDEX_BACKEND_CONF);
String indexConf = getESHosts();
try {
String[] hosts = indexConf.split(",");
String host = hosts[0];
Expand All @@ -342,15 +305,4 @@ private static int getPort(Configuration configuration) {

return port;
}

public static List<String> toArray(String destListStr, String delim) {
List<String> list = new ArrayList<String>();
if (StringUtils.isNotBlank(destListStr)) {
StringTokenizer tokenizer = new StringTokenizer(destListStr, delim.trim());
while (tokenizer.hasMoreTokens()) {
list.add(tokenizer.nextToken());
}
}
return list;
}
}
Loading