diff --git a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
index 3debc871121..26cfc38f2c0 100644
--- a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
+++ b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
@@ -41,7 +41,7 @@ public void getClassesViaHardcoded_classesPresent() throws Exception {
@Test
public void stockProviders() {
LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry();
- assertThat(defaultRegistry.providers()).hasSize(4);
+ assertThat(defaultRegistry.providers()).hasSize(5);
LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first");
assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class);
@@ -56,7 +56,13 @@ public void stockProviders() {
"outlier_detection_experimental");
assertThat(outlierDetection.getClass().getName()).isEqualTo(
"io.grpc.util.OutlierDetectionLoadBalancerProvider");
- assertThat(roundRobin.getPriority()).isEqualTo(5);
+ assertThat(outlierDetection.getPriority()).isEqualTo(5);
+
+ LoadBalancerProvider deterministicSubsetting = defaultRegistry.getProvider(
+ "deterministic_subsetting");
+ assertThat(deterministicSubsetting.getClass().getName()).isEqualTo(
+ "io.grpc.util.DeterministicSubsettingLoadBalancerProvider");
+ assertThat(deterministicSubsetting.getPriority()).isEqualTo(5);
LoadBalancerProvider grpclb = defaultRegistry.getProvider("grpclb");
assertThat(grpclb).isInstanceOf(GrpclbLoadBalancerProvider.class);
diff --git a/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java
new file mode 100644
index 00000000000..f0301cef0e3
--- /dev/null
+++ b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.Status;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Wraps a child {@code LoadBalancer}, separating the total set of backends into smaller subsets for
+ * the child balancer to balance across.
+ *
+ *
This implements deterministic subsetting gRFC:
+ * https://github.com/grpc/proposal/blob/master/A68-deterministic-subsetting-lb-policy.md
+ */
+@Internal
+public final class DeterministicSubsettingLoadBalancer extends LoadBalancer {
+
+ private final GracefulSwitchLoadBalancer switchLb;
+
+ @Override
+ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ DeterministicSubsettingLoadBalancerConfig config =
+ (DeterministicSubsettingLoadBalancerConfig)
+ resolvedAddresses.getLoadBalancingPolicyConfig();
+
+ switchLb.switchTo(config.childPolicy.getProvider());
+
+ ResolvedAddresses subsetAddresses = buildSubsets(resolvedAddresses, config);
+
+ switchLb.handleResolvedAddresses(
+ subsetAddresses.toBuilder()
+ .setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
+ .build());
+ return true;
+ }
+
+ // implements the subsetting algorithm, as described in A68:
+ // https://github.com/grpc/proposal/pull/383
+ private ResolvedAddresses buildSubsets(
+ ResolvedAddresses allAddresses, DeterministicSubsettingLoadBalancerConfig config) {
+ // The map should only retain entries for addresses in this latest update.
+ ArrayList addresses = new ArrayList<>();
+ for (EquivalentAddressGroup addressGroup : allAddresses.getAddresses()) {
+ addresses.addAll(addressGroup.getAddresses());
+ }
+
+ if (addresses.size() <= config.subsetSize) {
+ return allAddresses;
+ }
+ if (config.sortAddresses) {
+ // If we sort, we do so via the string representation of the SocketAddress.
+ addresses.sort(new AddressComparator());
+ }
+
+ Integer backendCount = addresses.size();
+ Integer subsetCount = backendCount / config.subsetSize;
+
+ Integer round = config.clientIndex / subsetCount;
+
+ Integer excludedCount = backendCount % config.subsetSize;
+ Integer excludedStart = (round * excludedCount) % backendCount;
+ Integer excludedEnd = (excludedStart + excludedCount) % backendCount;
+ if (excludedStart <= excludedEnd) {
+ List subList = addresses.subList(0, excludedStart);
+ subList.addAll(addresses.subList(excludedEnd, backendCount));
+ addresses = new ArrayList<>(subList);
+ } else {
+ addresses = new ArrayList<>(addresses.subList(excludedEnd, excludedStart));
+ }
+
+ Random r = new Random(round);
+ Collections.shuffle(addresses, r);
+
+ Integer subsetId = config.clientIndex % subsetCount;
+
+ Integer start = subsetId * config.subsetSize;
+ Integer end = start + config.subsetSize;
+
+ List subset = addresses.subList(start, end);
+
+ ArrayList eaglist = new ArrayList<>();
+
+ // Create new EAGs per address
+ for (SocketAddress addr : subset) {
+ eaglist.add(new EquivalentAddressGroup(addr));
+ }
+
+ ResolvedAddresses.Builder builder = allAddresses.toBuilder();
+ return builder.setAddresses(eaglist).build();
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ switchLb.handleNameResolutionError(error);
+ }
+
+ @Override
+ public void shutdown() {
+ switchLb.shutdown();
+ }
+
+ public DeterministicSubsettingLoadBalancer(Helper helper) {
+ switchLb = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper"));
+ }
+
+ @VisibleForTesting
+ static class AddressComparator implements Comparator {
+ // For consistency with the golang instrumentation, this assumes toString is overridden such
+ // that it is a string representation of an IP. Though any string representation of a
+ // SocketAddress will work here, other definitions of toString may yield differing results from
+ // the golang instrumentation.
+ @Override
+ public int compare(SocketAddress o1, SocketAddress o2) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ }
+
+ public static final class DeterministicSubsettingLoadBalancerConfig {
+
+ public final Integer clientIndex;
+ public final Integer subsetSize;
+ public final Boolean sortAddresses;
+
+ public final PolicySelection childPolicy;
+
+ private DeterministicSubsettingLoadBalancerConfig(
+ Integer clientIndex,
+ Integer subsetSize,
+ Boolean sortAddresses,
+ PolicySelection childPolicy) {
+ this.clientIndex = clientIndex;
+ this.subsetSize = subsetSize;
+ this.sortAddresses = sortAddresses;
+ this.childPolicy = childPolicy;
+ }
+
+ public static class Builder {
+ Integer clientIndex;
+ Integer subsetSize = 10;
+
+ Boolean sortAddresses;
+ PolicySelection childPolicy;
+
+ public Builder setClientIndex(Integer clientIndex) {
+ checkState(clientIndex != null);
+ // Indices must be positive integers.
+ checkState(clientIndex >= 0);
+ this.clientIndex = clientIndex;
+ return this;
+ }
+
+ public Builder setSubsetSize(Integer subsetSize) {
+ checkArgument(subsetSize != null);
+ // subsetSize of 1 is equivalent to `pick_first`. Use that policy if that behavior is
+ // desired.
+ // Fallback to default of 10 of condition is not satisfied.
+ checkArgument(subsetSize > 1);
+ this.subsetSize = subsetSize;
+ return this;
+ }
+
+ public Builder setSortAddresses(Boolean sortAddresses) {
+ checkArgument(sortAddresses != null);
+ this.sortAddresses = sortAddresses;
+ return this;
+ }
+
+ public Builder setChildPolicy(PolicySelection childPolicy) {
+ checkState(childPolicy != null);
+ this.childPolicy = childPolicy;
+ return this;
+ }
+
+ public DeterministicSubsettingLoadBalancerConfig build() {
+ checkState(childPolicy != null);
+ checkState(clientIndex != null);
+ return new DeterministicSubsettingLoadBalancerConfig(
+ clientIndex, subsetSize, sortAddresses, childPolicy);
+ }
+ }
+ }
+}
diff --git a/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java
new file mode 100644
index 00000000000..bc967564b0b
--- /dev/null
+++ b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.LoadBalancerRegistry;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.Status;
+import io.grpc.internal.JsonUtil;
+import io.grpc.internal.ServiceConfigUtil;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import java.util.List;
+import java.util.Map;
+
+@Internal
+public final class DeterministicSubsettingLoadBalancerProvider extends LoadBalancerProvider {
+
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return new DeterministicSubsettingLoadBalancer(helper);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 5;
+ }
+
+ @Override
+ public String getPolicyName() {
+ return "deterministic_subsetting";
+ }
+
+ @Override
+ public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) {
+ try {
+ return parseLoadBalancingPolicyConfigInternal(rawConfig);
+ } catch (RuntimeException e) {
+ return ConfigOrError.fromError(
+ Status.UNAVAILABLE
+ .withCause(e)
+ .withDescription("Failed parsing configuration for " + getPolicyName()));
+ }
+ }
+
+ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawConfig) {
+ Integer clientIndex = JsonUtil.getNumberAsInteger(rawConfig, "clientIndex");
+ Integer subsetSize = JsonUtil.getNumberAsInteger(rawConfig, "subsetSize");
+ Boolean sortAddresses = JsonUtil.getBoolean(rawConfig, "sortAddresses");
+
+ List childConfigCandidates =
+ ServiceConfigUtil.unwrapLoadBalancingConfigList(
+ JsonUtil.getListOfObjects(rawConfig, "childPolicy"));
+ if (childConfigCandidates == null || childConfigCandidates.isEmpty()) {
+ return ConfigOrError.fromError(
+ Status.INTERNAL.withDescription(
+ "No child policy in deterministic_subsetting LB policy " + rawConfig));
+ }
+
+ ConfigOrError selectedConfig =
+ ServiceConfigUtil.selectLbPolicyFromList(
+ childConfigCandidates, LoadBalancerRegistry.getDefaultRegistry());
+
+ DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder
+ configBuilder =
+ new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig
+ .Builder();
+
+ configBuilder.setChildPolicy((PolicySelection) selectedConfig.getConfig());
+
+ if (clientIndex != null) {
+ configBuilder.setClientIndex(clientIndex);
+ } else {
+ return ConfigOrError.fromError(
+ Status.INTERNAL.withDescription(
+ "No client index set, cannot determine subsets " + rawConfig));
+ }
+
+ if (subsetSize != null) {
+ configBuilder.setSubsetSize(subsetSize);
+ }
+
+ if (sortAddresses != null) {
+ configBuilder.setSortAddresses(sortAddresses);
+ }
+ return ConfigOrError.fromConfig(configBuilder.build());
+ }
+}
diff --git a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
index 1fdd69cb00b..9d36f44c511 100644
--- a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
+++ b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
@@ -1,2 +1,3 @@
io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider
io.grpc.util.OutlierDetectionLoadBalancerProvider
+io.grpc.util.DeterministicSubsettingLoadBalancerProvider
diff --git a/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java
new file mode 100644
index 00000000000..36ead450bea
--- /dev/null
+++ b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.InternalServiceProviders;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.Status;
+import io.grpc.internal.JsonParser;
+import io.grpc.util.DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig;
+import java.io.IOException;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DeterministicSubsettingLoadBalancerProviderTest {
+
+ private final DeterministicSubsettingLoadBalancerProvider provider =
+ new DeterministicSubsettingLoadBalancerProvider();
+
+ @Test
+ public void registered() {
+ for (LoadBalancerProvider current :
+ InternalServiceProviders.getCandidatesViaServiceLoader(
+ LoadBalancerProvider.class, getClass().getClassLoader())) {
+ if (current instanceof DeterministicSubsettingLoadBalancerProvider) {
+ return;
+ }
+ }
+ fail("DeterministicSubsettingLoadBalancerProvider not registered");
+ }
+
+ @Test
+ public void providesLoadBalancer() {
+ Helper helper = mock(Helper.class);
+ assertThat(provider.newLoadBalancer(helper))
+ .isInstanceOf(DeterministicSubsettingLoadBalancer.class);
+ }
+
+ @Test
+ public void parseConfigRequiresClientIdx() throws IOException {
+ String config = "{ \"childPolicy\" : [{\"round_robin\" : {}}] } ";
+
+ ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(parseJsonObject(config));
+ assertThat(configOrError.getError()).isNotNull();
+ assertThat(configOrError.getError().toString())
+ .isEqualTo(
+ Status.INTERNAL
+ .withDescription(
+ "No client index set, cannot determine subsets "
+ + "{childPolicy=[{round_robin={}}]}")
+ .toString());
+ }
+
+ @Test
+ public void parseConfigWithDefaults() throws IOException {
+ String lbConfig =
+ "{ \"clientIndex\" : 0, "
+ + "\"childPolicy\" : [{\"round_robin\" : {}}], "
+ + "\"sortAddresses\" : false }";
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
+ System.out.println(configOrError);
+ assertThat(configOrError.getConfig()).isNotNull();
+ DeterministicSubsettingLoadBalancerConfig config =
+ (DeterministicSubsettingLoadBalancerConfig) configOrError.getConfig();
+
+ assertThat(config.clientIndex).isEqualTo(0);
+ assertThat(config.sortAddresses).isEqualTo(false);
+ assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
+
+ assertThat(config.subsetSize).isEqualTo(10);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map parseJsonObject(String json) throws IOException {
+ return (Map) JsonParser.parse(json);
+ }
+
+ @Test
+ public void parseConfigWithCustomSubsetSize() throws IOException {
+ String lbConfig =
+ "{ \"clientIndex\" : 0, "
+ + "\"subsetSize\" : 3, "
+ + "\"childPolicy\" : [{\"round_robin\" : {}}], "
+ + "\"sortAddresses\" : false }";
+
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig));
+ assertThat(configOrError.getConfig()).isNotNull();
+ DeterministicSubsettingLoadBalancerConfig config =
+ (DeterministicSubsettingLoadBalancerConfig) configOrError.getConfig();
+ assertThat(config.subsetSize).isEqualTo(3);
+ }
+}
diff --git a/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java
new file mode 100644
index 00000000000..3c08c744077
--- /dev/null
+++ b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java
@@ -0,0 +1,473 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.CreateSubchannelArgs;
+import io.grpc.LoadBalancer.ResolvedAddresses;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelStateListener;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.Status;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import io.grpc.internal.TestUtils;
+import io.grpc.util.DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig;
+import io.grpc.util.OutlierDetectionLoadBalancerTest.FakeSocketAddress;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+
+public class DeterministicSubsettingLoadBalancerTest {
+
+ private List servers = Lists.newArrayList();
+ private Map, Subchannel> subchannels = Maps.newLinkedHashMap();
+
+ private final Map subchannelStateListeners =
+ Maps.newLinkedHashMap();
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+ @Mock private LoadBalancer.Helper mockHelper;
+ @Mock private LoadBalancer mockChildLb;
+ @Mock private SocketAddress mockSocketAddress;
+ @Captor private ArgumentCaptor connectivityStateCaptor;
+ @Captor private ArgumentCaptor errorPickerCaptor;
+
+ @Captor private ArgumentCaptor resolvedAddrCaptor;
+
+ private final LoadBalancerProvider mockChildLbProvider =
+ new TestUtils.StandardLoadBalancerProvider("foo_policy") {
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return mockChildLb;
+ }
+ };
+
+ private DeterministicSubsettingLoadBalancer loadBalancer;
+
+ private final LoadBalancerProvider roundRobinLbProvider =
+ new TestUtils.StandardLoadBalancerProvider("round_robin") {
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return new RoundRobinLoadBalancer(helper);
+ }
+ };
+
+ private void setupBackends(int backendCount) {
+ servers = Lists.newArrayList();
+ subchannels = Maps.newLinkedHashMap();
+ for (int i = 0; i < backendCount; i++) {
+ SocketAddress addr = new FakeSocketAddress("server" + i);
+ EquivalentAddressGroup e = new EquivalentAddressGroup(addr);
+ servers.add(e);
+ Subchannel sc = mock(Subchannel.class);
+ subchannels.put(Arrays.asList(e), sc);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ loadBalancer = new DeterministicSubsettingLoadBalancer(mockHelper);
+ }
+
+ public void addMock() {
+ when(mockHelper.createSubchannel(any(LoadBalancer.CreateSubchannelArgs.class))).then(
+ new Answer() {
+ @Override
+ public Subchannel answer(InvocationOnMock invocation) throws Throwable {
+ CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
+ final Subchannel subchannel = subchannels.get(args.getAddresses());
+ when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
+ when(subchannel.getAttributes()).thenReturn(args.getAttributes());
+ doAnswer(new Answer() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ subchannelStateListeners.put(subchannel,
+ (SubchannelStateListener) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(subchannel).start(any(SubchannelStateListener.class));
+ return subchannel;
+ }
+ });
+ }
+
+ @Test
+ public void handleNameResoutionError_noChildLb() {
+ loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
+
+ verify(mockHelper)
+ .updateBalancingState(connectivityStateCaptor.capture(), errorPickerCaptor.capture());
+ assertThat(connectivityStateCaptor.getValue()).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
+ }
+
+ @Test
+ public void handleNameResolutionError_withChildLb() {
+ DeterministicSubsettingLoadBalancerConfig config =
+ new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(2)
+ .setClientIndex(0)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build();
+
+ loadBalancer.acceptResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress)))
+ .setLoadBalancingPolicyConfig(config)
+ .build());
+ loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
+
+ verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED);
+ }
+
+ @Test
+ public void shutdown() {
+ DeterministicSubsettingLoadBalancerConfig config =
+ new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(2)
+ .setClientIndex(0)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build();
+
+ loadBalancer.acceptResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress)))
+ .setLoadBalancingPolicyConfig(config)
+ .build());
+ loadBalancer.shutdown();
+ verify(mockChildLb).shutdown();
+ }
+
+ @Test
+ public void addressComparator() {
+ setupBackends(5);
+ List sorted = Lists.newArrayList();
+ for (EquivalentAddressGroup eag : servers) {
+ sorted.addAll(eag.getAddresses());
+ }
+
+ Collections.shuffle(servers);
+ List addresses = Lists.newArrayList();
+ for (EquivalentAddressGroup eag : servers) {
+ addresses.addAll(eag.getAddresses());
+ }
+
+ assertThat(addresses).isNotEqualTo(sorted);
+ addresses.sort(new DeterministicSubsettingLoadBalancer.AddressComparator());
+
+ assertThat(addresses).isEqualTo(sorted);
+ }
+
+ @Test
+ public void acceptResolvedAddresses_mocked() {
+ int subsetSize = 3;
+ DeterministicSubsettingLoadBalancerConfig config =
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setClientIndex(0)
+ .setSortAddresses(true)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build();
+
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress)))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+
+ assertThat(loadBalancer.acceptResolvedAddresses(resolvedAddresses)).isTrue();
+
+ verify(mockChildLb)
+ .handleResolvedAddresses(
+ resolvedAddresses.toBuilder()
+ .setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
+ .build());
+ }
+
+ @Test
+ public void acceptResolvedAddresses() {
+ addMock();
+ setupBackends(6);
+ int subsetSize = 3;
+ DeterministicSubsettingLoadBalancerConfig config =
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setClientIndex(0)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(roundRobinLbProvider, null))
+ .build();
+
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+
+ assertThat(loadBalancer.acceptResolvedAddresses(resolvedAddresses)).isTrue();
+
+ int insubset = 0;
+ for (Subchannel subchannel : subchannels.values()) {
+ LoadBalancer.SubchannelStateListener sc = subchannelStateListeners.get(subchannel);
+ if (sc != null) { // it might be null if it's not in the subset.
+ insubset += 1;
+ sc.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ }
+ }
+
+ assertThat(insubset).isEqualTo(subsetSize);
+ }
+
+ @Test
+ public void sortingBackends() {
+ setupBackends(4);
+ // Shuffle servers so that they're not in 0, 1, 2 order
+ List shuffledServers =
+ Lists.newArrayList(servers.get(1), servers.get(3), servers.get(2), servers.get(0));
+ int subsetSize = 2;
+ DeterministicSubsettingLoadBalancerConfig sortConfig =
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setClientIndex(0)
+ .setSortAddresses(true)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build();
+
+ DeterministicSubsettingLoadBalancerConfig dontSortConfig =
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setClientIndex(0)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build();
+
+ List configs =
+ Lists.newArrayList(sortConfig, dontSortConfig);
+ List> actual = Lists.newArrayList();
+ for (DeterministicSubsettingLoadBalancerConfig config : configs) {
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(shuffledServers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+ verify(mockChildLb, atLeastOnce()).handleResolvedAddresses(resolvedAddrCaptor.capture());
+ // Verify ChildLB is only getting subsetSize ResolvedAddresses each time
+ assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize);
+ actual.add(resolvedAddrCaptor.getValue().getAddresses());
+ }
+ List actualSorted = actual.get(0);
+ List actualUnsorted = actual.get(1);
+
+ // We will sort, and then round 0 will shift from 0,1,2,3 to 3,0,1,2
+ assertThat(actualSorted).isEqualTo(Lists.newArrayList(servers.get(3), servers.get(0)));
+ // We will not sort, but round 0 will shift from 1,3,2,0 to 0,1,3,2 (same order given indices)
+ assertThat(actualUnsorted).isEqualTo(Lists.newArrayList(servers.get(0), servers.get(1)));
+ }
+
+ @Test
+ public void closesUnusedConns() {
+ addMock();
+ setupBackends(6);
+ List configs = Lists.newArrayList();
+ for (int i = 0; i < 2; i++) {
+ configs.add(
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(3)
+ .setClientIndex(i)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(roundRobinLbProvider, null))
+ .build());
+ }
+ Iterator scIterator = subchannels.values().iterator();
+ scIterator.next(); // subchannel0
+ Subchannel subchannel1 = scIterator.next();
+ Subchannel subchannel2 = scIterator.next();
+ scIterator.next(); // subchannel3
+ Subchannel subchannel4 = scIterator.next();
+ scIterator.next(); // subchannel5
+
+ // In the first call to RR.acceptResolvedAddresses, all subchannels will be new
+ // with nothing to close. in the second iteration, we need to remove the subchannels
+ // from the first subset.
+ List> subsets =
+ Lists.newArrayList(
+ Lists.newArrayList(), Lists.newArrayList(subchannel4, subchannel1, subchannel2));
+ int newconns = 0;
+
+ for (int i = 0; i < 2; i++) {
+ DeterministicSubsettingLoadBalancerConfig config = configs.get(i);
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+ for (Subchannel sc : subsets.get(i)) {
+ verify(sc).shutdown();
+ }
+ for (Subchannel sc : subchannels.values()) {
+ LoadBalancer.SubchannelStateListener ssl = subchannelStateListeners.get(sc);
+ if (ssl != null) {
+ newconns += 1;
+ ssl.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ }
+ }
+ subchannelStateListeners.clear();
+ }
+ assertThat(newconns).isEqualTo(6);
+ }
+
+ @Test
+ public void reusesConns() {
+ addMock();
+ setupBackends(3);
+ List configs = Lists.newArrayList();
+ for (int i = 0; i < 3; i++) {
+ configs.add(
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(3)
+ .setClientIndex(i)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(roundRobinLbProvider, null))
+ .build());
+ }
+
+ List perRun = Lists.newArrayList();
+
+ for (DeterministicSubsettingLoadBalancerConfig config : configs) {
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+ int numSubchannelsOpened = 0;
+ for (Subchannel subchannel : subchannels.values()) {
+ LoadBalancer.SubchannelStateListener sc = subchannelStateListeners.get(subchannel);
+ if (sc != null) {
+ sc.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ numSubchannelsOpened += 1;
+ }
+ }
+ perRun.add(numSubchannelsOpened);
+ subchannelStateListeners.clear();
+ }
+ assertThat(perRun).isEqualTo(Lists.newArrayList(3, 0, 0));
+ }
+
+ @Test
+ public void backendsCanBeDistributedEvenly() {
+ // Backends can be distributed evenly, so they should be. Therefore, maxDiff = 0
+ verifyCreatesSubsets(12, 8, 3, 0);
+ }
+
+ @Test
+ public void backendsCanNotBeDistributedEvenly() {
+ // Backends can't be distributed evenly because there are excluded backends in every round and
+ // not enough clients to fill the last round. This provides 2 opportunities for a backend to be
+ // excluded, so the maxDiff is its maximum, 2
+ verifyCreatesSubsets(37, 22, 5, 2);
+ }
+
+ @Test
+ public void notEnoughClientsForLastRound() {
+ // There are no excluded backends in each round, but there are not enough clients for the
+ // last round, meaning there is only one chance for a backend to be excluded.
+ // Therefore, maxDiff =1
+ verifyCreatesSubsets(20, 7, 5, 1);
+ }
+
+ @Test
+ public void excludedBackendsInEveryRound() {
+ // There are enough clients to fill the last round, but there are excluded backends
+ // in every round, meaning there is only one chance for a backend to be excluded.
+ // Therefore, maxDiff =1
+ verifyCreatesSubsets(21, 8, 5, 1);
+ }
+
+ @Test
+ public void excludedStartBiggerThanEnd() {
+ // There are 3 excluded backends on each round, and sometimes the selected excluded backends
+ // wrap around.
+ verifyCreatesSubsets(7, 3, 4, 1);
+ }
+
+ public void verifyCreatesSubsets(int backends, int clients, int subsetSize, int maxDiff) {
+ setupBackends(backends);
+ List configs = Lists.newArrayList();
+ for (int i = 0; i < clients; i++) {
+ configs.add(
+ new DeterministicSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setClientIndex(i)
+ .setSortAddresses(false)
+ .setChildPolicy(new PolicySelection(mockChildLbProvider, null))
+ .build());
+ }
+
+ Map subsetDistn = Maps.newLinkedHashMap();
+
+ for (DeterministicSubsettingLoadBalancerConfig config : configs) {
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+ verify(mockChildLb, atLeastOnce()).handleResolvedAddresses(resolvedAddrCaptor.capture());
+ // Verify ChildLB is only getting subsetSize ResolvedAddresses each time
+ assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize);
+ for (EquivalentAddressGroup eag : resolvedAddrCaptor.getValue().getAddresses()) {
+ for (SocketAddress addr : eag.getAddresses()) {
+ Integer prev = subsetDistn.getOrDefault(addr, 0);
+ subsetDistn.put(addr, prev + 1);
+ }
+ }
+ }
+ int maxConns = Collections.max(subsetDistn.values());
+ int minConns = Collections.min(subsetDistn.values());
+
+ assertThat(maxConns <= minConns + maxDiff).isTrue();
+ }
+}
diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java
index 18f9bbf549f..da704643451 100644
--- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java
+++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java
@@ -1037,7 +1037,7 @@ public void mathChecksOut() {
assertThat(stdev).isEqualTo(147.32277488562318);
}
- private static class FakeSocketAddress extends SocketAddress {
+ static class FakeSocketAddress extends SocketAddress {
final String name;