Skip to content

Commit 8a5e6b9

Browse files
authored
MLH-1477 Replay Failed ES transactions | Bring consistency between 2 stores (#5660)
* MLH-1477 add dlq replay service and api endpoint to monitor status (#5566) * MLH-1477 dummy commit * MLH-1477 add branch to build * MLH-1477 update to trigger a build * MLH-1477 dummy commit * MLH-1477 add dlq replay service and api endpoint to monitor status * MLH-1477 dummy commit * MLH-1477 wire in only elastic configuration * MLH-1477 lazily connect to elasticsearch index * MLH-1477 load indexProvider like janusgraph * MLH-1477 initialise bootstrap servers * MLH-1477 increase timeout * MLH-1477 add more logs for debugging. update serializer * MLH-1477 reduce poll time * MLH-1477 use pause and resume * MLH-1477 improve DLQ handling * MLH-1477 break on errors * MLH-1477 seek back when error * MLH-1477 retry with exponential backoff * MLH-1477 add tests * MLH-1477 optimise imports * MLH-1477 remove comment on latest * MLH-1477 remove option to start the dlq manually * MLH-1477 change to non daemon thread and improve destroy and cleanup * MLH-1477 add dependency (#5619) * MLH-1477 add dependency * MLH-1477 add dependency as test * MLH-1477 refactor dlq to use repair flow to keep it idempotent (#5659) * MLH-1477 refactor dlq to use repair flow to keep it idempotent * MLH-1477 handle NPE in mutations map * MLH-1477 set kafka property in atlas startup for DLQ * MLH-1477 not use management system * MLH-1477 remove unused method. also increase poll timeout seconds * MLH-1477 remove cleanUpTransaction since it is not required * MLH-1477 remove cleanUpTransaction since it is not required * MLH-1477 handle NPQ in reindex method
1 parent fb4758c commit 8a5e6b9

File tree

14 files changed

+1683
-72
lines changed

14 files changed

+1683
-72
lines changed

.github/workflows/maven.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,9 @@ name: Java CI with Maven
2222
on:
2323
push:
2424
branches:
25-
- alpha
2625
- beta
27-
- development
2826
- master
2927
- staging
30-
- tagscanary
31-
- tagscanarymerge
32-
- fixlabels
33-
- interceptapis
34-
- mlh-1240-improve-cm-refresh-master
35-
- mlh1432
36-
- mlh-1620
3728

3829
jobs:
3930
# Detect what changed to optimize workflow execution

LOCAL_SETUP.md

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# Local Development Setup Guide for Atlas
2+
3+
This guide will help you set up Atlas for local development.
4+
5+
## Prerequisites
6+
7+
### Required Software
8+
- Java 17 (Recommended: Zulu OpenJDK 17)
9+
- Maven 3.8+
10+
- Docker (via Colima for macOS)
11+
- Git
12+
- Get the source code from the AtlanHQ repository (An override of Apache Atlas)
13+
- Download the zip and configuration artifacts from https://atlanhq.atlassian.net/wiki/spaces/c873aeb606dd4834a95d9909a757bfa6/pages/800424446/How+to+run+Atlas+on+the+local+machine
14+
15+
### Java Setup
16+
1. Install Java 17:
17+
```bash
18+
brew install zulu17
19+
```
20+
21+
2. Set JAVA_HOME:
22+
```bash
23+
export JAVA_HOME=/Library/Java/JavaVirtualMachines/zulu-17.jdk/Contents/Home
24+
export PATH=$JAVA_HOME/bin:$PATH
25+
```
26+
27+
### Maven Setup
28+
29+
1. Configure GitHub Package Registry access:
30+
Create or update `~/.m2/settings.xml`:
31+
```xml
32+
<settings>
33+
<servers>
34+
<server>
35+
<id>github</id>
36+
<username>YOUR_GITHUB_USERNAME</username>
37+
<password>YOUR_GITHUB_PAT_TOKEN</password>
38+
</server>
39+
</servers>
40+
41+
<profiles>
42+
<profile>
43+
<id>github</id>
44+
<repositories>
45+
<repository>
46+
<id>github</id>
47+
<url>https://maven.pkg.github.com/atlanhq/janusgraph</url>
48+
</repository>
49+
</repositories>
50+
</profile>
51+
</profiles>
52+
53+
<activeProfiles>
54+
<activeProfile>github</activeProfile>
55+
</activeProfiles>
56+
</settings>
57+
```
58+
59+
Note: Generate a GitHub Personal Access Token with `read:packages` scope.
60+
61+
### Docker Setup (using Colima)
62+
63+
1. Install Colima:
64+
```bash
65+
brew install colima
66+
```
67+
68+
2. Start Colima:
69+
```bash
70+
colima start
71+
```
72+
73+
## Building Atlas
74+
75+
1. Clone the repository:
76+
```bash
77+
git clone https://github.com/atlanhq/atlas-metastore.git
78+
cd atlas-metastore
79+
```
80+
81+
2. Build the project:
82+
```bash
83+
mvn clean -Dos.detected.classifier=osx-x86_64 -Dmaven.test.skip -DskipTests -Drat.skip=true -DskipOverlay -DskipEnunciate=true install package -Pdist
84+
```
85+
86+
## Running Dependencies
87+
88+
Atlas requires several services to run. Use Docker Compose to start them:
89+
90+
1. Required Services:
91+
- Redis (for caching)
92+
- Cassandra (for metadata storage)
93+
- Elasticsearch (for search functionality)
94+
- Kafka (optional - for notifications)
95+
96+
2. Start the services:
97+
```bash
98+
cd deploy
99+
docker-compose up -d redis cassandra elasticsearch
100+
```
101+
102+
If you need Kafka:
103+
```bash
104+
docker-compose up -d kafka
105+
```
106+
107+
3. Wait for services to be healthy:
108+
- Redis: Default port 6379
109+
- Cassandra: Default port 9042
110+
- Elasticsearch: Default port 9200
111+
- Kafka (if enabled): Default port 9092
112+
113+
## Running Atlas
114+
115+
1. Start Atlas server:
116+
```bash
117+
java -Datlas.home=deploy/ -Datlas.conf=deploy/conf -Datlas.data=deploy/data -Datlas.log.dir=deploy/logs -Dranger.plugin.atlas.policy.pollIntervalMs=300000 -Dembedded.solr.directory=deploy/data -Dlogback.configurationFile=file:./deploy/conf/atlas-logback.xml -Dzookeeper.snapshot.trust.empty=true --add-opens java.base/java.lang=ALL-UNNAMED -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dorg.apache.http.nio.reactor.ioThreadCount=4 -Dcassandra.connection.pool.max=4 -Djanusgraph.connection.pool.max=2 -Dnetty.eventLoopThreads=4 -XX:+UseCompressedOops -XX:+UseCompressedClassPointers -Xms512m
118+
org.apache.atlas.Atlas
119+
```
120+
121+
2. Access the UI:
122+
- URL: http://localhost:21000
123+
- Default credentials: admin/admin
124+
125+
## Troubleshooting
126+
127+
1. If services fail to start, check Docker logs:
128+
```bash
129+
docker-compose logs -f [service_name]
130+
```
131+
132+
2. For Atlas server issues, check logs in:
133+
```
134+
logs/application.log
135+
```
136+
137+
3. Common issues:
138+
- Port conflicts: Ensure no other services are using required ports
139+
- Memory issues: Adjust Docker resource limits in Colima
140+
- Connection timeouts: Ensure all required services are healthy before starting Atlas
141+
142+
## Additional Resources
143+
144+
For more detailed information, refer to:
145+
- [Atlas Documentation](https://atlas.apache.org/documentation.html)
146+
- [Internal Setup Guide](https://atlanhq.atlassian.net/wiki/spaces/c873aeb606dd4834a95d9909a757bfa6/pages/800424446/How+to+run+Atlas+on+the+local+machine)
147+
148+
## Notes
149+
150+
- The build command skips tests and various checks for faster development builds
151+
- For production builds, remove the skip flags
152+
- Keep your GitHub PAT token secure and never commit it to version control
153+
- Adjust memory and CPU settings in Colima based on your machine's capabilities

intg/src/main/java/org/apache/atlas/type/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,9 @@ public final class Constants {
9393
ES_ATLAN_KEYWORD_ANALYZER_CONFIG.put("normalizer", "atlan_normalizer");
9494
}
9595

96+
public static final String INDEX_NAME_VERTEX_INDEX = "vertex_index";
97+
public static final String INDEX_NAME_FULLTEXT_INDEX = "fulltext_index";
98+
public static final String INDEX_NAME_EDGE_INDEX = "edge_index";
99+
96100
private Constants() {}
97101
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@
720720
<httpcomponents-httpcore.version>4.4.13</httpcomponents-httpcore.version>
721721
<jackson.databind.version>2.13.4.2</jackson.databind.version>
722722
<jackson.version>2.12.4</jackson.version>
723-
<janusgraph.version>1.0.0</janusgraph.version>
723+
<janusgraph.version>1.0.2-atlan</janusgraph.version>
724724
<janusgraph.cassandra.version>0.5.3</janusgraph.cassandra.version>
725725
<jaxb.api.version>2.3.1</jaxb.api.version>
726726
<javax-inject.version>1</javax-inject.version>

repository/src/main/java/org/apache/atlas/AtlanElasticSearchIndex.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.ByteArrayOutputStream;
3434
import java.io.IOException;
3535
import java.io.InputStream;
36+
import java.io.UncheckedIOException;
3637
import java.time.Instant;
3738
import java.util.ArrayList;
3839
import java.util.Collection;
@@ -278,11 +279,66 @@ private static Object convertGeoshape(Geoshape geoshape, Mapping mapping) {
278279
}
279280
}
280281

281-
private BackendException convert(Exception esException) {
282-
if (esException instanceof InterruptedException) {
283-
return new TemporaryBackendException("Interrupted while waiting for response", esException);
284-
} else {
285-
return new PermanentBackendException("Unknown exception while executing index operation", esException);
282+
private BackendException convert(Exception esException) {
283+
if (esException instanceof InterruptedException) {
284+
return new TemporaryBackendException("Interrupted while waiting for response", esException);
285+
}
286+
287+
// Check if this is a retryable exception by examining the exception chain
288+
Throwable cause = esException;
289+
while (cause != null) {
290+
final String className = cause.getClass().getName();
291+
final String message = cause.getMessage() != null ? cause.getMessage().toLowerCase() : "";
292+
293+
// Network-related exceptions that should be retried
294+
if (className.contains("ConnectException") ||
295+
className.contains("SocketTimeoutException") ||
296+
className.contains("NoHttpResponseException") ||
297+
className.contains("ConnectionClosedException") ||
298+
className.contains("SocketException")) {
299+
return new TemporaryBackendException("Temporary network exception during ES operation: " + cause.getMessage(), esException);
300+
}
301+
302+
// HTTP status codes that indicate temporary failures
303+
if (message.contains("503") || message.contains("service unavailable") ||
304+
message.contains("429") || message.contains("too many requests") ||
305+
message.contains("408") || message.contains("request timeout") ||
306+
message.contains("502") || message.contains("bad gateway") ||
307+
message.contains("504") || message.contains("gateway timeout")) {
308+
return new TemporaryBackendException("Temporary ES server error: " + cause.getMessage(), esException);
309+
}
310+
311+
// Cluster/node availability issues
312+
if (message.contains("no available connection") ||
313+
message.contains("connection refused") ||
314+
message.contains("connection reset") ||
315+
message.contains("broken pipe") ||
316+
message.contains("connection pool shut down") ||
317+
message.contains("cluster block exception") ||
318+
message.contains("node not connected")) {
319+
return new TemporaryBackendException("ES cluster temporarily unavailable: " + cause.getMessage(), esException);
286320
}
321+
322+
cause = cause.getCause();
323+
}
324+
325+
// Validation errors, mapping errors, and other permanent failures
326+
final String exMessage = esException.getMessage() != null ? esException.getMessage().toLowerCase() : "";
327+
if (exMessage.contains("mapper_parsing_exception") ||
328+
exMessage.contains("illegal_argument_exception") ||
329+
exMessage.contains("parsing_exception") ||
330+
exMessage.contains("version_conflict") ||
331+
exMessage.contains("strict_dynamic_mapping_exception")) {
332+
return new PermanentBackendException("Permanent ES error: " + esException.getMessage(), esException);
287333
}
334+
335+
// Default to TemporaryBackendException to allow retries for unknown IOException types
336+
// Most IOExceptions in Elasticsearch context are transient network issues
337+
if (esException instanceof IOException || esException instanceof UncheckedIOException) {
338+
return new TemporaryBackendException("Temporary IO exception during ES operation, will retry: " + esException.getMessage(), esException);
339+
}
340+
341+
// For truly unknown exceptions, treat as permanent
342+
return new PermanentBackendException("Unknown exception while executing ES operation: " + esException.getMessage(), esException);
343+
}
288344
}

repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2345,4 +2345,10 @@ public static Iterator<AtlasEdge> getAdjacentEdgesByLabelWithTimeout(
23452345
})
23462346
.blockingGet();
23472347
}
2348+
2349+
public Set<AtlasVertex> getVertices(Set<Long> vertexIds) {
2350+
if (CollectionUtils.isEmpty(vertexIds)) return Collections.emptySet();
2351+
Set<String> uniqueVertexIds = vertexIds.stream().map(String::valueOf).collect(Collectors.toSet());
2352+
return graph.getVertices(uniqueVertexIds.toArray(new String[0]));
2353+
}
23482354
}

0 commit comments

Comments
 (0)