Skip to content

Commit 1ef957c

Browse files
committed
Handle index deletion while querying in ES|QL
1 parent afd3a42 commit 1ef957c

File tree

2 files changed

+80
-5
lines changed

2 files changed

+80
-5
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRetryIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.service.ClusterService;
1114
import org.elasticsearch.index.IndexService;
1215
import org.elasticsearch.index.shard.IndexShard;
1316
import org.elasticsearch.indices.IndicesService;
@@ -20,11 +23,14 @@
2023
import java.util.ArrayList;
2124
import java.util.Collection;
2225
import java.util.List;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
2328
import java.util.concurrent.atomic.AtomicBoolean;
2429

2530
import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
2631
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2732
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.not;
2834

2935
public class EsqlRetryIT extends AbstractEsqlIntegTestCase {
3036

@@ -59,6 +65,37 @@ public void testRetryOnShardFailures() throws Exception {
5965
}
6066
}
6167

68+
public void testQueryWhileDeletingIndices() {
69+
populateIndices();
70+
CountDownLatch waitForDeletion = new CountDownLatch(1);
71+
try {
72+
final AtomicBoolean deleted = new AtomicBoolean();
73+
for (String node : internalCluster().getNodeNames()) {
74+
MockTransportService.getInstance(node)
75+
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
76+
if (deleted.compareAndSet(false, true)) {
77+
deleteIndexCompletely("log-index-2");
78+
waitForDeletion.countDown();
79+
} else {
80+
assertTrue(waitForDeletion.await(10, TimeUnit.SECONDS));
81+
}
82+
handler.messageReceived(request, channel, task);
83+
});
84+
}
85+
EsqlQueryRequest request = new EsqlQueryRequest();
86+
request.query("FROM log-* | STATS COUNT(timestamp) | LIMIT 1");
87+
request.allowPartialResults(true);
88+
try (var resp = run(request)) {
89+
assertTrue(resp.isPartial());
90+
assertThat(EsqlTestUtils.getValuesList(resp).getFirst().getFirst(), equalTo(4L));
91+
}
92+
} finally {
93+
for (String node : internalCluster().getNodeNames()) {
94+
MockTransportService.getInstance(node).clearAllRules();
95+
}
96+
}
97+
}
98+
6299
private void populateIndices() {
63100
internalCluster().ensureAtLeastNumDataNodes(2);
64101
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
@@ -88,4 +125,36 @@ private void closeOrFailShards(String nodeName) throws Exception {
88125
}
89126
}
90127
}
128+
129+
/**
130+
* Deletes the given index and ensures it is completely removed from the cluster state and from all nodes
131+
*/
132+
private void deleteIndexCompletely(String indexName) throws Exception {
133+
assertAcked(indicesAdmin().prepareDelete(indexName));
134+
String[] nodeNames = internalCluster().getNodeNames();
135+
assertBusy(() -> {
136+
for (String nodeName : nodeNames) {
137+
ClusterState clusterState = internalCluster().getInstance(ClusterService.class, nodeName).state();
138+
for (IndexMetadata imd : clusterState.metadata().indicesAllProjects()) {
139+
assertThat(
140+
"Index [" + indexName + "] still exists on the cluster state on [" + nodeName + "]",
141+
imd.getIndex().getName(),
142+
not(equalTo(indexName))
143+
);
144+
}
145+
}
146+
for (String nodeName : nodeNames) {
147+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
148+
for (IndexService indexService : indicesService) {
149+
for (IndexShard indexShard : indexService) {
150+
assertThat(
151+
"Index [" + indexName + "] still exists on node [" + nodeName + "]",
152+
indexShard.shardId().getIndexName(),
153+
not(equalTo(indexName))
154+
);
155+
}
156+
}
157+
}
158+
});
159+
}
91160
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import org.elasticsearch.compute.operator.DriverCompletionInfo;
2828
import org.elasticsearch.compute.operator.FailureCollector;
2929
import org.elasticsearch.index.Index;
30+
import org.elasticsearch.index.IndexNotFoundException;
3031
import org.elasticsearch.index.query.QueryBuilder;
3132
import org.elasticsearch.index.shard.ShardId;
33+
import org.elasticsearch.index.shard.ShardNotFoundException;
3234
import org.elasticsearch.search.SearchShardTarget;
3335
import org.elasticsearch.search.internal.AliasFilter;
3436
import org.elasticsearch.tasks.CancellableTask;
@@ -515,15 +517,19 @@ Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
515517
var project = projectResolver.getProjectState(clusterService.state());
516518
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
517519
for (var shardId : shardIds) {
518-
nodes.put(
519-
shardId,
520-
project.routingTable()
520+
List<DiscoveryNode> allocatedNodes;
521+
try {
522+
allocatedNodes = project.routingTable()
521523
.shardRoutingTable(shardId)
522524
.allShards()
523525
.filter(shard -> shard.active() && shard.isSearchable())
524526
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
525-
.toList()
526-
);
527+
.toList();
528+
} catch (Exception ex) {
529+
assert ex instanceof IndexNotFoundException || ex instanceof ShardNotFoundException : new AssertionError(ex);
530+
continue;
531+
}
532+
nodes.put(shardId, allocatedNodes);
527533
}
528534
return nodes;
529535
}

0 commit comments

Comments
 (0)