Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
Expand All @@ -34,13 +38,13 @@
import com.google.common.collect.Lists;

import io.grpc.Attributes;
import io.grpc.Attributes.Builder;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.SharedResourceHolder;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.common.util.GrpcUtils;

/**
* The DiscoveryClientNameResolver resolves the service hosts and their associated gRPC port using the channel's name
Expand All @@ -59,7 +63,7 @@ public class DiscoveryClientNameResolver extends NameResolver {
private final String name;
private final DiscoveryClient client;
private final SynchronizationContext syncContext;
private final Runnable externalCleaner;
private final Consumer<DiscoveryClientNameResolver> shutdownHook;
private final SharedResourceHolder.Resource<Executor> executorResource;
private final boolean usingExecutorResource;

Expand All @@ -78,27 +82,46 @@ public class DiscoveryClientNameResolver extends NameResolver {
* @param client The client used to look up the service addresses.
* @param args The name resolver args.
* @param executorResource The executor resource.
* @param externalCleaner The optional cleaner used during {@link #shutdown()}
* @param shutdownHook The optional cleaner used during {@link #shutdown()}
*/
public DiscoveryClientNameResolver(final String name, final DiscoveryClient client, final Args args,
final SharedResourceHolder.Resource<Executor> executorResource, final Runnable externalCleaner) {
final SharedResourceHolder.Resource<Executor> executorResource,
final Consumer<DiscoveryClientNameResolver> shutdownHook) {
this.name = name;
this.client = client;
this.syncContext = requireNonNull(args.getSynchronizationContext(), "syncContext");
this.externalCleaner = externalCleaner;
this.shutdownHook = shutdownHook;
this.executor = args.getOffloadExecutor();
this.usingExecutorResource = this.executor == null;
this.executorResource = executorResource;
}

/**
* Gets the name of the service to get the instances of.
*
* @return The name associated with this resolver.
*/
protected final String getName() {
return this.name;
}

/**
* Checks whether this resolver is active. E.g. {@code #start} has been called, but not {@code #shutdown()}.
*
* @return True, if there is a listener attached. False, otherwise.
*/
protected final boolean isActive() {
return this.listener != null;
}

@Override
public final String getServiceAuthority() {
return this.name;
}

@Override
public void start(final Listener2 listener) {
checkState(this.listener == null, "already started");
checkState(!isActive(), "already started");
if (this.usingExecutorResource) {
this.executor = SharedResourceHolder.get(this.executorResource);
}
Expand All @@ -108,7 +131,7 @@ public void start(final Listener2 listener) {

@Override
public void refresh() {
checkState(this.listener != null, "not started");
checkState(isActive(), "not started");
resolve();
}

Expand All @@ -120,19 +143,99 @@ public void refresh() {
*/
public void refreshFromExternal() {
this.syncContext.execute(() -> {
if (this.listener != null) {
if (isActive()) {
resolve();
}
});
}

/**
* Discovers matching service instances. Can be overwritten to apply some custom filtering.
*
* @return A list of service instances to use.
*/
protected List<ServiceInstance> discoverServers() {
return this.client.getInstances(this.name);
}

/**
* Extracts the gRPC server port from the given service instance. Can be overwritten for a custom port mapping.
*
* @param instance The instance to extract the port from.
* @return The gRPC server port.
* @throws IllegalArgumentException If the specified port definition couldn't be parsed.
*/
protected int getGrpcPort(final ServiceInstance instance) {
final Map<String, String> metadata = instance.getMetadata();
if (metadata == null || metadata.isEmpty()) {
return instance.getPort();
}
String portString = metadata.get(CLOUD_DISCOVERY_METADATA_PORT);
if (portString == null) {
portString = metadata.get(LEGACY_CLOUD_DISCOVERY_METADATA_PORT);
if (portString == null) {
return instance.getPort();
} else {
log.warn("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead",
LEGACY_CLOUD_DISCOVERY_METADATA_PORT, getName(), CLOUD_DISCOVERY_METADATA_PORT);
}
}
try {
return Integer.parseInt(portString);
} catch (final NumberFormatException e) {
// TODO: How to handle this case?
throw new IllegalArgumentException("Failed to parse gRPC port information from: " + instance, e);
}
}

/**
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
* custom attributes.
*
* @param serviceInstance The service instance to get them from.
* @return The newly created attributes for the given instance.
*/
protected Attributes getAttributes(final ServiceInstance serviceInstance) {
final Builder builder = Attributes.newBuilder();
builder.set(DISCOVERY_SERVICE_NAME_KEY, this.name);
builder.set(DISCOVERY_INSTANCE_ID_KEY, serviceInstance.getInstanceId());
return builder.build();
}

/**
* Checks whether this instance should update its connections.
*
* @param newInstanceList The new instances that should be compared to the stored ones.
* @return True, if the given instance list contains different entries than the stored ones.
*/
protected boolean needsToUpdateConnections(final List<ServiceInstance> newInstanceList) {
if (this.instanceList.size() != newInstanceList.size()) {
return true;
}
for (final ServiceInstance instance : this.instanceList) {
final int port = getGrpcPort(instance);
boolean isSame = false;
for (final ServiceInstance newInstance : newInstanceList) {
final int newPort = getGrpcPort(newInstance);
if (newInstance.getHost().equals(instance.getHost()) && port == newPort) {
isSame = true;
break;
}
}
if (!isSame) {
return true;
}
}
return false;
}

private void resolve() {
log.debug("Scheduled resolve for {}", this.name);
if (this.resolving) {
return;
}
this.resolving = true;
this.executor.execute(new Resolve(this.listener, this.instanceList));
this.executor.execute(new Resolve(this.listener));
}

@Override
Expand All @@ -142,8 +245,8 @@ public void shutdown() {
this.executor = SharedResourceHolder.release(this.executorResource, this.executor);
}
this.instanceList = Lists.newArrayList();
if (this.externalCleaner != null) {
this.externalCleaner.run();
if (this.shutdownHook != null) {
this.shutdownHook.accept(this);
}
}

Expand All @@ -157,34 +260,32 @@ public String toString() {
*/
private final class Resolve implements Runnable {

// The listener is stored in an extra variable to avoid NPEs if the resolver is shutdown while resolving
private final Listener2 savedListener;
private final List<ServiceInstance> savedInstanceList;

/**
* Creates a new Resolve that stores a snapshot of the relevant states of the resolver.
*
* @param listener The listener to send the results to.
* @param instanceList The current server instance list.
*/
Resolve(final Listener2 listener, final List<ServiceInstance> instanceList) {
Resolve(final Listener2 listener) {
this.savedListener = requireNonNull(listener, "listener");
this.savedInstanceList = requireNonNull(instanceList, "instanceList");
}

@Override
public void run() {
final AtomicReference<List<ServiceInstance>> resultContainer = new AtomicReference<>();
final AtomicReference<List<ServiceInstance>> resultContainer = new AtomicReference<>(KEEP_PREVIOUS);
try {
resultContainer.set(resolveInternal());
} catch (final Exception e) {
this.savedListener.onError(Status.UNAVAILABLE.withCause(e)
.withDescription("Failed to update server list for " + DiscoveryClientNameResolver.this.name));
.withDescription("Failed to update server list for " + getName()));
resultContainer.set(Lists.newArrayList());
} finally {
DiscoveryClientNameResolver.this.syncContext.execute(() -> {
DiscoveryClientNameResolver.this.resolving = false;
final List<ServiceInstance> result = resultContainer.get();
if (result != KEEP_PREVIOUS && DiscoveryClientNameResolver.this.listener != null) {
if (result != KEEP_PREVIOUS && isActive()) {
DiscoveryClientNameResolver.this.instanceList = result;
}
});
Expand All @@ -198,98 +299,45 @@ public void run() {
* should be used.
*/
private List<ServiceInstance> resolveInternal() {
final String name = DiscoveryClientNameResolver.this.name;
final List<ServiceInstance> newInstanceList =
DiscoveryClientNameResolver.this.client.getInstances(name);
log.debug("Got {} candidate servers for {}", newInstanceList.size(), name);
// Discover servers
final List<ServiceInstance> newInstanceList = discoverServers();
if (CollectionUtils.isEmpty(newInstanceList)) {
log.error("No servers found for {}", name);
this.savedListener.onError(Status.UNAVAILABLE.withDescription("No servers found for " + name));
log.error("No servers found for {}", getName());
this.savedListener.onError(Status.UNAVAILABLE.withDescription("No servers found for " + getName()));
return Lists.newArrayList();
} else {
log.debug("Got {} candidate servers for {}", newInstanceList.size(), getName());
}

// Check for changes
if (!needsToUpdateConnections(newInstanceList)) {
log.debug("Nothing has changed... skipping update for {}", name);
log.debug("Nothing has changed... skipping update for {}", getName());
return KEEP_PREVIOUS;
}
log.debug("Ready to update server list for {}", name);
final List<EquivalentAddressGroup> targets = Lists.newArrayList();
for (final ServiceInstance instance : newInstanceList) {
final int port = getGRPCPort(instance);
log.debug("Found gRPC server {}:{} for {}", instance.getHost(), port, name);
targets.add(new EquivalentAddressGroup(
new InetSocketAddress(instance.getHost(), port), Attributes.EMPTY));
}
if (targets.isEmpty()) {
log.error("None of the servers for {} specified a gRPC port", name);
this.savedListener.onError(Status.UNAVAILABLE
.withDescription("None of the servers for " + name + " specified a gRPC port"));
return Lists.newArrayList();
} else {
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(targets)
.build());
log.info("Done updating server list for {}", name);
return newInstanceList;
}

// Set new servers
log.debug("Ready to update server list for {}", getName());
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList))
.build());
log.info("Done updating server list for {}", getName());
return newInstanceList;
}

/**
* Extracts the gRPC server port from the given service instance.
*
* @param instance The instance to extract the port from.
* @return The gRPC server port.
* @throws IllegalArgumentException If the specified port definition couldn't be parsed.
*/
private int getGRPCPort(final ServiceInstance instance) {
final Map<String, String> metadata = instance.getMetadata();
if (metadata == null) {
return instance.getPort();
}
String portString = metadata.get(GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT);
if (portString == null) {
portString = metadata.get(LEGACY_CLOUD_DISCOVERY_METADATA_PORT);
if (portString == null) {
return instance.getPort();
} else {
log.warn("Found legacy grpc port metadata '{}' for client '{}' use '{}' instead",
LEGACY_CLOUD_DISCOVERY_METADATA_PORT, DiscoveryClientNameResolver.this.name,
GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT);
}
}
try {
return Integer.parseInt(portString);
} catch (final NumberFormatException e) {
// TODO: How to handle this case?
throw new IllegalArgumentException("Failed to parse gRPC port information from: " + instance, e);
private List<EquivalentAddressGroup> toTargets(final List<ServiceInstance> newInstanceList) {
final List<EquivalentAddressGroup> targets = Lists.newArrayList();
for (final ServiceInstance instance : newInstanceList) {
targets.add(toTarget(instance));
}
return targets;
}

/**
* Checks whether this instance should update its connections.
*
* @param newInstanceList The new instances that should be compared to the stored ones.
* @return True, if the given instance list contains different entries than the stored ones.
*/
private boolean needsToUpdateConnections(final List<ServiceInstance> newInstanceList) {
if (this.savedInstanceList.size() != newInstanceList.size()) {
return true;
}
for (final ServiceInstance instance : this.savedInstanceList) {
final int port = getGRPCPort(instance);
boolean isSame = false;
for (final ServiceInstance newInstance : newInstanceList) {
final int newPort = getGRPCPort(newInstance);
if (newInstance.getHost().equals(instance.getHost())
&& port == newPort) {
isSame = true;
break;
}
}
if (!isSame) {
return true;
}
}
return false;
private EquivalentAddressGroup toTarget(final ServiceInstance instance) {
final String host = instance.getHost();
final int port = getGrpcPort(instance);
final Attributes attributes = getAttributes(instance);
log.debug("Found gRPC server {}:{} for {}", host, port, getName());
return new EquivalentAddressGroup(new InetSocketAddress(host, port), attributes);
}

}
Expand Down
Loading