diff --git a/docs/changelog/137694.yaml b/docs/changelog/137694.yaml new file mode 100644 index 0000000000000..4654293ce6f80 --- /dev/null +++ b/docs/changelog/137694.yaml @@ -0,0 +1,5 @@ +pr: 137694 +summary: Iterate directly over contents of `RoutingNode` +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 2b5806724c75f..d056e501b29e9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -213,18 +214,16 @@ void remove(ShardRouting shard) { assert invariant(); } - private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; - - public ShardRouting[] initializing() { - return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + public Iterable initializing() { + return Iterables.assertReadOnly(initializingShards); } - public ShardRouting[] relocating() { - return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + public Iterable relocating() { + return Iterables.assertReadOnly(relocatingShards); } - public ShardRouting[] started() { - return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY); + public Iterable started() { + return Iterables.assertReadOnly(startedShards); } /** @@ -313,6 +312,8 @@ public String toString() { return sb.toString(); } + private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0]; + public ShardRouting[] copyShards() { return shards.values().toArray(EMPTY_SHARD_ROUTING_ARRAY); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 274bd3d818261..5481a4f34cca7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -294,12 +296,14 @@ public DesiredBalance compute( final var rerouteExplanation = command.execute(routingAllocation, false); assert rerouteExplanation.decisions().type() != Decision.Type.NO : "should have thrown for NO decision"; if (rerouteExplanation.decisions().type() != Decision.Type.NO) { - final ShardRouting[] initializingShards = routingNodes.node( + final Iterator initializingShardsIterator = routingNodes.node( routingAllocation.nodes().resolveNode(command.toNode()).getId() - ).initializing(); - assert initializingShards.length == 1 - : "expect exactly one relocating shard, but got: " + List.of(initializingShards); - final var initializingShard = initializingShards[0]; + ).initializing().iterator(); + assert initializingShardsIterator.hasNext(); + final var initializingShard = initializingShardsIterator.next(); + assert initializingShardsIterator.hasNext() == false + : "expect exactly one relocating shard, but got: " + + Iterators.toList(Iterators.concat(Iterators.single(initializingShard), initializingShardsIterator)); assert routingAllocation.nodes() .resolveNode(command.fromNode()) .getId() diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index b66d9e890e0ba..708656cd52288 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -127,11 +127,16 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing // Count the primaries currently doing recovery on the node, to ensure the primariesInitialRecoveries setting is obeyed. int primariesInRecovery = 0; + final var returnUnexplainedDecision = allocation.debugDecision() == false; for (ShardRouting shard : node.initializing()) { // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node* // we only count initial recoveries here, so we need to make sure that relocating node is null if (shard.primary() && shard.relocatingNodeId() == null) { primariesInRecovery++; + if (returnUnexplainedDecision && primariesInRecovery >= primariesInitialRecoveries) { + // bail out early if we don't need the final total + return THROTTLE; + } } } if (primariesInRecovery >= primariesInitialRecoveries) { diff --git a/server/src/main/java/org/elasticsearch/common/collect/Iterators.java b/server/src/main/java/org/elasticsearch/common/collect/Iterators.java index 346d9b2598f20..bc963165dbb52 100644 --- a/server/src/main/java/org/elasticsearch/common/collect/Iterators.java +++ b/server/src/main/java/org/elasticsearch/common/collect/Iterators.java @@ -9,6 +9,7 @@ package org.elasticsearch.common.collect; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import java.util.ArrayList; @@ -507,6 +508,42 @@ public T next() { } } + /** + * Adds a wrapper around {@code iterator} which asserts that {@link Iterator#remove()} is not called. + */ + public static Iterator assertReadOnly(final Iterator iterator) { + return Assertions.ENABLED ? new AssertReadOnlyIterator<>(Objects.requireNonNull(iterator)) : iterator; + } + + private static class AssertReadOnlyIterator implements Iterator { + + private final Iterator delegate; + + AssertReadOnlyIterator(Iterator delegate) { + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public T next() { + return delegate.next(); + } + + @Override + public void forEachRemaining(Consumer action) { + delegate.forEachRemaining(action); + } + + @Override + public void remove() { + throw new AssertionError(); + } + } + public static boolean equals(Iterator iterator1, Iterator iterator2, BiPredicate itemComparer) { if (iterator1 == null) { return iterator2 == null; diff --git a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java index 8ebf30c2c688f..00e5c6c2a0720 100644 --- a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java +++ b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java @@ -9,6 +9,9 @@ package org.elasticsearch.common.util.iterable; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.core.Assertions; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -81,4 +84,15 @@ public static int indexOf(Iterable iterable, Predicate predicate) { public static long size(Iterable iterable) { return StreamSupport.stream(iterable.spliterator(), false).count(); } + + /** + * Adds a wrapper around {@code iterable} which asserts that {@link Iterator#remove()} is not called on the iterator it returns. + */ + public static Iterable assertReadOnly(Iterable iterable) { + if (Assertions.ENABLED) { + return () -> Iterators.assertReadOnly(iterable.iterator()); + } else { + return iterable; + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java index 9ded7670e95d7..8846718cbd788 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java @@ -18,9 +18,8 @@ import org.elasticsearch.test.ESTestCase; import java.net.InetAddress; -import java.util.Arrays; +import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -161,7 +160,9 @@ public void testReturnStartedShards() { } private static Set startedShardsSet(RoutingNode routingNode) { - return Arrays.stream(routingNode.started()).map(ShardRouting::shardId).collect(Collectors.toSet()); + final var result = new HashSet(); + routingNode.started().forEach(shardRouting -> result.add(shardRouting.shardId())); + return result; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 7aab8001a0c5b..a3389a17ba3d8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.TimeProvider; @@ -103,10 +104,10 @@ import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; @@ -581,7 +582,7 @@ public void allocate(RoutingAllocation allocation) { allocation.routingNodes().getRelocatingShardCount(), equalTo(0) ); - assertThat(allocation.routingNodes().node("node-2").started(), arrayWithSize(2)); + assertThat(Iterators.toList(allocation.routingNodes().node("node-2").started().iterator()), hasSize(2)); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java b/server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java index 0f52bf9a5efe1..e55d71c752fd4 100644 --- a/server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java @@ -328,6 +328,19 @@ public void testCycle() { } } + public void testAssertReadOnly() { + assumeTrue("assertions enabled", Assertions.ENABLED); + final List innerList = new ArrayList<>(List.of(1, 2, 3, 4)); + assertTrue(Iterators.equals(innerList.iterator(), Iterators.assertReadOnly(innerList.iterator()), Objects::equals)); + + final var readonly = Iterators.assertReadOnly(innerList.iterator()); + assertTrue(readonly.hasNext()); + assertEquals(Integer.valueOf(1), readonly.next()); + expectThrows(AssertionError.class, readonly::remove); + + assertEquals(List.of(1, 2, 3, 4), innerList); + } + public void testEquals() { final BiPredicate notCalled = (a, b) -> { throw new AssertionError("not called"); };