diff --git a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java index 3debc871121..26cfc38f2c0 100644 --- a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java +++ b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java @@ -41,7 +41,7 @@ public void getClassesViaHardcoded_classesPresent() throws Exception { @Test public void stockProviders() { LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry(); - assertThat(defaultRegistry.providers()).hasSize(4); + assertThat(defaultRegistry.providers()).hasSize(5); LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first"); assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class); @@ -56,7 +56,13 @@ public void stockProviders() { "outlier_detection_experimental"); assertThat(outlierDetection.getClass().getName()).isEqualTo( "io.grpc.util.OutlierDetectionLoadBalancerProvider"); - assertThat(roundRobin.getPriority()).isEqualTo(5); + assertThat(outlierDetection.getPriority()).isEqualTo(5); + + LoadBalancerProvider deterministicSubsetting = defaultRegistry.getProvider( + "deterministic_subsetting"); + assertThat(deterministicSubsetting.getClass().getName()).isEqualTo( + "io.grpc.util.DeterministicSubsettingLoadBalancerProvider"); + assertThat(deterministicSubsetting.getPriority()).isEqualTo(5); LoadBalancerProvider grpclb = defaultRegistry.getProvider("grpclb"); assertThat(grpclb).isInstanceOf(GrpclbLoadBalancerProvider.class); diff --git a/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java new file mode 100644 index 00000000000..f0301cef0e3 --- /dev/null +++ b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancer.java @@ -0,0 +1,210 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.EquivalentAddressGroup; +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; + +/** + * Wraps a child {@code LoadBalancer}, separating the total set of backends into smaller subsets for + * the child balancer to balance across. + * + *

This implements deterministic subsetting gRFC: + * https://github.com/grpc/proposal/blob/master/A68-deterministic-subsetting-lb-policy.md + */ +@Internal +public final class DeterministicSubsettingLoadBalancer extends LoadBalancer { + + private final GracefulSwitchLoadBalancer switchLb; + + @Override + public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + DeterministicSubsettingLoadBalancerConfig config = + (DeterministicSubsettingLoadBalancerConfig) + resolvedAddresses.getLoadBalancingPolicyConfig(); + + switchLb.switchTo(config.childPolicy.getProvider()); + + ResolvedAddresses subsetAddresses = buildSubsets(resolvedAddresses, config); + + switchLb.handleResolvedAddresses( + subsetAddresses.toBuilder() + .setLoadBalancingPolicyConfig(config.childPolicy.getConfig()) + .build()); + return true; + } + + // implements the subsetting algorithm, as described in A68: + // https://github.com/grpc/proposal/pull/383 + private ResolvedAddresses buildSubsets( + ResolvedAddresses allAddresses, DeterministicSubsettingLoadBalancerConfig config) { + // The map should only retain entries for addresses in this latest update. + ArrayList addresses = new ArrayList<>(); + for (EquivalentAddressGroup addressGroup : allAddresses.getAddresses()) { + addresses.addAll(addressGroup.getAddresses()); + } + + if (addresses.size() <= config.subsetSize) { + return allAddresses; + } + if (config.sortAddresses) { + // If we sort, we do so via the string representation of the SocketAddress. + addresses.sort(new AddressComparator()); + } + + Integer backendCount = addresses.size(); + Integer subsetCount = backendCount / config.subsetSize; + + Integer round = config.clientIndex / subsetCount; + + Integer excludedCount = backendCount % config.subsetSize; + Integer excludedStart = (round * excludedCount) % backendCount; + Integer excludedEnd = (excludedStart + excludedCount) % backendCount; + if (excludedStart <= excludedEnd) { + List subList = addresses.subList(0, excludedStart); + subList.addAll(addresses.subList(excludedEnd, backendCount)); + addresses = new ArrayList<>(subList); + } else { + addresses = new ArrayList<>(addresses.subList(excludedEnd, excludedStart)); + } + + Random r = new Random(round); + Collections.shuffle(addresses, r); + + Integer subsetId = config.clientIndex % subsetCount; + + Integer start = subsetId * config.subsetSize; + Integer end = start + config.subsetSize; + + List subset = addresses.subList(start, end); + + ArrayList eaglist = new ArrayList<>(); + + // Create new EAGs per address + for (SocketAddress addr : subset) { + eaglist.add(new EquivalentAddressGroup(addr)); + } + + ResolvedAddresses.Builder builder = allAddresses.toBuilder(); + return builder.setAddresses(eaglist).build(); + } + + @Override + public void handleNameResolutionError(Status error) { + switchLb.handleNameResolutionError(error); + } + + @Override + public void shutdown() { + switchLb.shutdown(); + } + + public DeterministicSubsettingLoadBalancer(Helper helper) { + switchLb = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper")); + } + + @VisibleForTesting + static class AddressComparator implements Comparator { + // For consistency with the golang instrumentation, this assumes toString is overridden such + // that it is a string representation of an IP. Though any string representation of a + // SocketAddress will work here, other definitions of toString may yield differing results from + // the golang instrumentation. + @Override + public int compare(SocketAddress o1, SocketAddress o2) { + return o1.toString().compareTo(o2.toString()); + } + } + + public static final class DeterministicSubsettingLoadBalancerConfig { + + public final Integer clientIndex; + public final Integer subsetSize; + public final Boolean sortAddresses; + + public final PolicySelection childPolicy; + + private DeterministicSubsettingLoadBalancerConfig( + Integer clientIndex, + Integer subsetSize, + Boolean sortAddresses, + PolicySelection childPolicy) { + this.clientIndex = clientIndex; + this.subsetSize = subsetSize; + this.sortAddresses = sortAddresses; + this.childPolicy = childPolicy; + } + + public static class Builder { + Integer clientIndex; + Integer subsetSize = 10; + + Boolean sortAddresses; + PolicySelection childPolicy; + + public Builder setClientIndex(Integer clientIndex) { + checkState(clientIndex != null); + // Indices must be positive integers. + checkState(clientIndex >= 0); + this.clientIndex = clientIndex; + return this; + } + + public Builder setSubsetSize(Integer subsetSize) { + checkArgument(subsetSize != null); + // subsetSize of 1 is equivalent to `pick_first`. Use that policy if that behavior is + // desired. + // Fallback to default of 10 of condition is not satisfied. + checkArgument(subsetSize > 1); + this.subsetSize = subsetSize; + return this; + } + + public Builder setSortAddresses(Boolean sortAddresses) { + checkArgument(sortAddresses != null); + this.sortAddresses = sortAddresses; + return this; + } + + public Builder setChildPolicy(PolicySelection childPolicy) { + checkState(childPolicy != null); + this.childPolicy = childPolicy; + return this; + } + + public DeterministicSubsettingLoadBalancerConfig build() { + checkState(childPolicy != null); + checkState(clientIndex != null); + return new DeterministicSubsettingLoadBalancerConfig( + clientIndex, subsetSize, sortAddresses, childPolicy); + } + } + } +} diff --git a/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java new file mode 100644 index 00000000000..bc967564b0b --- /dev/null +++ b/util/src/main/java/io/grpc/util/DeterministicSubsettingLoadBalancerProvider.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import java.util.List; +import java.util.Map; + +@Internal +public final class DeterministicSubsettingLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new DeterministicSubsettingLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "deterministic_subsetting"; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + try { + return parseLoadBalancingPolicyConfigInternal(rawConfig); + } catch (RuntimeException e) { + return ConfigOrError.fromError( + Status.UNAVAILABLE + .withCause(e) + .withDescription("Failed parsing configuration for " + getPolicyName())); + } + } + + private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawConfig) { + Integer clientIndex = JsonUtil.getNumberAsInteger(rawConfig, "clientIndex"); + Integer subsetSize = JsonUtil.getNumberAsInteger(rawConfig, "subsetSize"); + Boolean sortAddresses = JsonUtil.getBoolean(rawConfig, "sortAddresses"); + + List childConfigCandidates = + ServiceConfigUtil.unwrapLoadBalancingConfigList( + JsonUtil.getListOfObjects(rawConfig, "childPolicy")); + if (childConfigCandidates == null || childConfigCandidates.isEmpty()) { + return ConfigOrError.fromError( + Status.INTERNAL.withDescription( + "No child policy in deterministic_subsetting LB policy " + rawConfig)); + } + + ConfigOrError selectedConfig = + ServiceConfigUtil.selectLbPolicyFromList( + childConfigCandidates, LoadBalancerRegistry.getDefaultRegistry()); + + DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder + configBuilder = + new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig + .Builder(); + + configBuilder.setChildPolicy((PolicySelection) selectedConfig.getConfig()); + + if (clientIndex != null) { + configBuilder.setClientIndex(clientIndex); + } else { + return ConfigOrError.fromError( + Status.INTERNAL.withDescription( + "No client index set, cannot determine subsets " + rawConfig)); + } + + if (subsetSize != null) { + configBuilder.setSubsetSize(subsetSize); + } + + if (sortAddresses != null) { + configBuilder.setSortAddresses(sortAddresses); + } + return ConfigOrError.fromConfig(configBuilder.build()); + } +} diff --git a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index 1fdd69cb00b..9d36f44c511 100644 --- a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -1,2 +1,3 @@ io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider io.grpc.util.OutlierDetectionLoadBalancerProvider +io.grpc.util.DeterministicSubsettingLoadBalancerProvider diff --git a/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java new file mode 100644 index 00000000000..36ead450bea --- /dev/null +++ b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerProviderTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import io.grpc.InternalServiceProviders; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonParser; +import io.grpc.util.DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig; +import java.io.IOException; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DeterministicSubsettingLoadBalancerProviderTest { + + private final DeterministicSubsettingLoadBalancerProvider provider = + new DeterministicSubsettingLoadBalancerProvider(); + + @Test + public void registered() { + for (LoadBalancerProvider current : + InternalServiceProviders.getCandidatesViaServiceLoader( + LoadBalancerProvider.class, getClass().getClassLoader())) { + if (current instanceof DeterministicSubsettingLoadBalancerProvider) { + return; + } + } + fail("DeterministicSubsettingLoadBalancerProvider not registered"); + } + + @Test + public void providesLoadBalancer() { + Helper helper = mock(Helper.class); + assertThat(provider.newLoadBalancer(helper)) + .isInstanceOf(DeterministicSubsettingLoadBalancer.class); + } + + @Test + public void parseConfigRequiresClientIdx() throws IOException { + String config = "{ \"childPolicy\" : [{\"round_robin\" : {}}] } "; + + ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(parseJsonObject(config)); + assertThat(configOrError.getError()).isNotNull(); + assertThat(configOrError.getError().toString()) + .isEqualTo( + Status.INTERNAL + .withDescription( + "No client index set, cannot determine subsets " + + "{childPolicy=[{round_robin={}}]}") + .toString()); + } + + @Test + public void parseConfigWithDefaults() throws IOException { + String lbConfig = + "{ \"clientIndex\" : 0, " + + "\"childPolicy\" : [{\"round_robin\" : {}}], " + + "\"sortAddresses\" : false }"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + System.out.println(configOrError); + assertThat(configOrError.getConfig()).isNotNull(); + DeterministicSubsettingLoadBalancerConfig config = + (DeterministicSubsettingLoadBalancerConfig) configOrError.getConfig(); + + assertThat(config.clientIndex).isEqualTo(0); + assertThat(config.sortAddresses).isEqualTo(false); + assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); + + assertThat(config.subsetSize).isEqualTo(10); + } + + @SuppressWarnings("unchecked") + private static Map parseJsonObject(String json) throws IOException { + return (Map) JsonParser.parse(json); + } + + @Test + public void parseConfigWithCustomSubsetSize() throws IOException { + String lbConfig = + "{ \"clientIndex\" : 0, " + + "\"subsetSize\" : 3, " + + "\"childPolicy\" : [{\"round_robin\" : {}}], " + + "\"sortAddresses\" : false }"; + + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getConfig()).isNotNull(); + DeterministicSubsettingLoadBalancerConfig config = + (DeterministicSubsettingLoadBalancerConfig) configOrError.getConfig(); + assertThat(config.subsetSize).isEqualTo(3); + } +} diff --git a/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java new file mode 100644 index 00000000000..3c08c744077 --- /dev/null +++ b/util/src/test/java/io/grpc/util/DeterministicSubsettingLoadBalancerTest.java @@ -0,0 +1,473 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TestUtils; +import io.grpc.util.DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig; +import io.grpc.util.OutlierDetectionLoadBalancerTest.FakeSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +public class DeterministicSubsettingLoadBalancerTest { + + private List servers = Lists.newArrayList(); + private Map, Subchannel> subchannels = Maps.newLinkedHashMap(); + + private final Map subchannelStateListeners = + Maps.newLinkedHashMap(); + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock private LoadBalancer.Helper mockHelper; + @Mock private LoadBalancer mockChildLb; + @Mock private SocketAddress mockSocketAddress; + @Captor private ArgumentCaptor connectivityStateCaptor; + @Captor private ArgumentCaptor errorPickerCaptor; + + @Captor private ArgumentCaptor resolvedAddrCaptor; + + private final LoadBalancerProvider mockChildLbProvider = + new TestUtils.StandardLoadBalancerProvider("foo_policy") { + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return mockChildLb; + } + }; + + private DeterministicSubsettingLoadBalancer loadBalancer; + + private final LoadBalancerProvider roundRobinLbProvider = + new TestUtils.StandardLoadBalancerProvider("round_robin") { + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new RoundRobinLoadBalancer(helper); + } + }; + + private void setupBackends(int backendCount) { + servers = Lists.newArrayList(); + subchannels = Maps.newLinkedHashMap(); + for (int i = 0; i < backendCount; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + EquivalentAddressGroup e = new EquivalentAddressGroup(addr); + servers.add(e); + Subchannel sc = mock(Subchannel.class); + subchannels.put(Arrays.asList(e), sc); + } + } + + @Before + public void setUp() { + loadBalancer = new DeterministicSubsettingLoadBalancer(mockHelper); + } + + public void addMock() { + when(mockHelper.createSubchannel(any(LoadBalancer.CreateSubchannelArgs.class))).then( + new Answer() { + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0]; + final Subchannel subchannel = subchannels.get(args.getAddresses()); + when(subchannel.getAllAddresses()).thenReturn(args.getAddresses()); + when(subchannel.getAttributes()).thenReturn(args.getAttributes()); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + subchannelStateListeners.put(subchannel, + (SubchannelStateListener) invocation.getArguments()[0]); + return null; + } + }).when(subchannel).start(any(SubchannelStateListener.class)); + return subchannel; + } + }); + } + + @Test + public void handleNameResoutionError_noChildLb() { + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockHelper) + .updateBalancingState(connectivityStateCaptor.capture(), errorPickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(ConnectivityState.TRANSIENT_FAILURE); + } + + @Test + public void handleNameResolutionError_withChildLb() { + DeterministicSubsettingLoadBalancerConfig config = + new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(2) + .setClientIndex(0) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build(); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress))) + .setLoadBalancingPolicyConfig(config) + .build()); + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED); + } + + @Test + public void shutdown() { + DeterministicSubsettingLoadBalancerConfig config = + new DeterministicSubsettingLoadBalancer.DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(2) + .setClientIndex(0) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build(); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress))) + .setLoadBalancingPolicyConfig(config) + .build()); + loadBalancer.shutdown(); + verify(mockChildLb).shutdown(); + } + + @Test + public void addressComparator() { + setupBackends(5); + List sorted = Lists.newArrayList(); + for (EquivalentAddressGroup eag : servers) { + sorted.addAll(eag.getAddresses()); + } + + Collections.shuffle(servers); + List addresses = Lists.newArrayList(); + for (EquivalentAddressGroup eag : servers) { + addresses.addAll(eag.getAddresses()); + } + + assertThat(addresses).isNotEqualTo(sorted); + addresses.sort(new DeterministicSubsettingLoadBalancer.AddressComparator()); + + assertThat(addresses).isEqualTo(sorted); + } + + @Test + public void acceptResolvedAddresses_mocked() { + int subsetSize = 3; + DeterministicSubsettingLoadBalancerConfig config = + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setClientIndex(0) + .setSortAddresses(true) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build(); + + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress))) + .setLoadBalancingPolicyConfig(config) + .build(); + + assertThat(loadBalancer.acceptResolvedAddresses(resolvedAddresses)).isTrue(); + + verify(mockChildLb) + .handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(config.childPolicy.getConfig()) + .build()); + } + + @Test + public void acceptResolvedAddresses() { + addMock(); + setupBackends(6); + int subsetSize = 3; + DeterministicSubsettingLoadBalancerConfig config = + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setClientIndex(0) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)) + .build(); + + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + + assertThat(loadBalancer.acceptResolvedAddresses(resolvedAddresses)).isTrue(); + + int insubset = 0; + for (Subchannel subchannel : subchannels.values()) { + LoadBalancer.SubchannelStateListener sc = subchannelStateListeners.get(subchannel); + if (sc != null) { // it might be null if it's not in the subset. + insubset += 1; + sc.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + } + } + + assertThat(insubset).isEqualTo(subsetSize); + } + + @Test + public void sortingBackends() { + setupBackends(4); + // Shuffle servers so that they're not in 0, 1, 2 order + List shuffledServers = + Lists.newArrayList(servers.get(1), servers.get(3), servers.get(2), servers.get(0)); + int subsetSize = 2; + DeterministicSubsettingLoadBalancerConfig sortConfig = + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setClientIndex(0) + .setSortAddresses(true) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build(); + + DeterministicSubsettingLoadBalancerConfig dontSortConfig = + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setClientIndex(0) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build(); + + List configs = + Lists.newArrayList(sortConfig, dontSortConfig); + List> actual = Lists.newArrayList(); + for (DeterministicSubsettingLoadBalancerConfig config : configs) { + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(shuffledServers)) + .setLoadBalancingPolicyConfig(config) + .build(); + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + verify(mockChildLb, atLeastOnce()).handleResolvedAddresses(resolvedAddrCaptor.capture()); + // Verify ChildLB is only getting subsetSize ResolvedAddresses each time + assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize); + actual.add(resolvedAddrCaptor.getValue().getAddresses()); + } + List actualSorted = actual.get(0); + List actualUnsorted = actual.get(1); + + // We will sort, and then round 0 will shift from 0,1,2,3 to 3,0,1,2 + assertThat(actualSorted).isEqualTo(Lists.newArrayList(servers.get(3), servers.get(0))); + // We will not sort, but round 0 will shift from 1,3,2,0 to 0,1,3,2 (same order given indices) + assertThat(actualUnsorted).isEqualTo(Lists.newArrayList(servers.get(0), servers.get(1))); + } + + @Test + public void closesUnusedConns() { + addMock(); + setupBackends(6); + List configs = Lists.newArrayList(); + for (int i = 0; i < 2; i++) { + configs.add( + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(3) + .setClientIndex(i) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)) + .build()); + } + Iterator scIterator = subchannels.values().iterator(); + scIterator.next(); // subchannel0 + Subchannel subchannel1 = scIterator.next(); + Subchannel subchannel2 = scIterator.next(); + scIterator.next(); // subchannel3 + Subchannel subchannel4 = scIterator.next(); + scIterator.next(); // subchannel5 + + // In the first call to RR.acceptResolvedAddresses, all subchannels will be new + // with nothing to close. in the second iteration, we need to remove the subchannels + // from the first subset. + List> subsets = + Lists.newArrayList( + Lists.newArrayList(), Lists.newArrayList(subchannel4, subchannel1, subchannel2)); + int newconns = 0; + + for (int i = 0; i < 2; i++) { + DeterministicSubsettingLoadBalancerConfig config = configs.get(i); + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + for (Subchannel sc : subsets.get(i)) { + verify(sc).shutdown(); + } + for (Subchannel sc : subchannels.values()) { + LoadBalancer.SubchannelStateListener ssl = subchannelStateListeners.get(sc); + if (ssl != null) { + newconns += 1; + ssl.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + } + } + subchannelStateListeners.clear(); + } + assertThat(newconns).isEqualTo(6); + } + + @Test + public void reusesConns() { + addMock(); + setupBackends(3); + List configs = Lists.newArrayList(); + for (int i = 0; i < 3; i++) { + configs.add( + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(3) + .setClientIndex(i) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)) + .build()); + } + + List perRun = Lists.newArrayList(); + + for (DeterministicSubsettingLoadBalancerConfig config : configs) { + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + int numSubchannelsOpened = 0; + for (Subchannel subchannel : subchannels.values()) { + LoadBalancer.SubchannelStateListener sc = subchannelStateListeners.get(subchannel); + if (sc != null) { + sc.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + numSubchannelsOpened += 1; + } + } + perRun.add(numSubchannelsOpened); + subchannelStateListeners.clear(); + } + assertThat(perRun).isEqualTo(Lists.newArrayList(3, 0, 0)); + } + + @Test + public void backendsCanBeDistributedEvenly() { + // Backends can be distributed evenly, so they should be. Therefore, maxDiff = 0 + verifyCreatesSubsets(12, 8, 3, 0); + } + + @Test + public void backendsCanNotBeDistributedEvenly() { + // Backends can't be distributed evenly because there are excluded backends in every round and + // not enough clients to fill the last round. This provides 2 opportunities for a backend to be + // excluded, so the maxDiff is its maximum, 2 + verifyCreatesSubsets(37, 22, 5, 2); + } + + @Test + public void notEnoughClientsForLastRound() { + // There are no excluded backends in each round, but there are not enough clients for the + // last round, meaning there is only one chance for a backend to be excluded. + // Therefore, maxDiff =1 + verifyCreatesSubsets(20, 7, 5, 1); + } + + @Test + public void excludedBackendsInEveryRound() { + // There are enough clients to fill the last round, but there are excluded backends + // in every round, meaning there is only one chance for a backend to be excluded. + // Therefore, maxDiff =1 + verifyCreatesSubsets(21, 8, 5, 1); + } + + @Test + public void excludedStartBiggerThanEnd() { + // There are 3 excluded backends on each round, and sometimes the selected excluded backends + // wrap around. + verifyCreatesSubsets(7, 3, 4, 1); + } + + public void verifyCreatesSubsets(int backends, int clients, int subsetSize, int maxDiff) { + setupBackends(backends); + List configs = Lists.newArrayList(); + for (int i = 0; i < clients; i++) { + configs.add( + new DeterministicSubsettingLoadBalancerConfig.Builder() + .setSubsetSize(subsetSize) + .setClientIndex(i) + .setSortAddresses(false) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)) + .build()); + } + + Map subsetDistn = Maps.newLinkedHashMap(); + + for (DeterministicSubsettingLoadBalancerConfig config : configs) { + ResolvedAddresses resolvedAddresses = + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config) + .build(); + loadBalancer.acceptResolvedAddresses(resolvedAddresses); + verify(mockChildLb, atLeastOnce()).handleResolvedAddresses(resolvedAddrCaptor.capture()); + // Verify ChildLB is only getting subsetSize ResolvedAddresses each time + assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize); + for (EquivalentAddressGroup eag : resolvedAddrCaptor.getValue().getAddresses()) { + for (SocketAddress addr : eag.getAddresses()) { + Integer prev = subsetDistn.getOrDefault(addr, 0); + subsetDistn.put(addr, prev + 1); + } + } + } + int maxConns = Collections.max(subsetDistn.values()); + int minConns = Collections.min(subsetDistn.values()); + + assertThat(maxConns <= minConns + maxDiff).isTrue(); + } +} diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index 18f9bbf549f..da704643451 100644 --- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -1037,7 +1037,7 @@ public void mathChecksOut() { assertThat(stdev).isEqualTo(147.32277488562318); } - private static class FakeSocketAddress extends SocketAddress { + static class FakeSocketAddress extends SocketAddress { final String name;