Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ You can deal with ephemeral tokens using the {@link io.vertx.serviceresolver.kub
The resolver calls the provider when it needs a fresh token. The token is cached by the resolver until it is detetected
stale by the resolver (upon a `401` server response code).

==== Matching specific service ports

When a service exposes more than one port, the resolver retains only a single port, it might not be the expected port.

You can build a specific service address for a given service port number.

[source,java]
----
{@link examples.ServiceResolverExamples#servicePortNumberMatching}
----

Or a given service port name.

[source,java]
----
{@link examples.ServiceResolverExamples#servicePortNameMatching}
----

=== SRV resolver

The SRV resolver uses DNS SRV records to resolve and locate services.
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/examples/ServiceResolverExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.vertx.serviceresolver.ServiceResolverClient;
import io.vertx.serviceresolver.kube.KubeResolver;
import io.vertx.serviceresolver.kube.KubeResolverOptions;
import io.vertx.serviceresolver.kube.KubernetesServiceAddressBuilder;
import io.vertx.serviceresolver.kube.impl.KubernetesServiceAddress;
import io.vertx.serviceresolver.srv.SrvResolver;
import io.vertx.serviceresolver.srv.SrvResolverOptions;

Expand Down Expand Up @@ -142,6 +144,20 @@ public void configuringKubernetesResolver(String host, int port, String namespac
.setWebSocketClientOptions(wsClientOptions);
}

public void servicePortNumberMatching() {
ServiceAddress serviceAddress = KubernetesServiceAddressBuilder
.of("the-service")
.withPortNumber(8080)
.build();
}

public void servicePortNameMatching(String portName) {
ServiceAddress serviceAddress = KubernetesServiceAddressBuilder
.of("the-service")
.withPortName(portName)
.build();
}

public void configuringSRVResolver(Vertx vertx, String dnsServer, int dnsPort) {

SrvResolverOptions options = new SrvResolverOptions()
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/serviceresolver/ServiceAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface ServiceAddress extends Address {
* @param name the service name
* @return the service address
*/
static ServiceAddress of(String name) {
public static ServiceAddress of(String name) {
Objects.requireNonNull(name);
return new ServiceAddress() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.vertx.serviceresolver.kube;

import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.kube.impl.KubernetesServiceAddress;

import java.util.Objects;

/**
* <p>Build a {@link ServiceAddress} for Kubernetes capable of distinguish a service endpoint
* by their <a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#endpointport-v1-core">port</a>.</p>
*
* <p>This is useful when dealing with pods exposing multiple ports.</p>
*/
public class KubernetesServiceAddressBuilder {

public static KubernetesServiceAddressBuilder of(String name) {
return new KubernetesServiceAddressBuilder(name);
}

private final String name;
private int portNumber = 0;
private String portName = null;

public KubernetesServiceAddressBuilder(String name) {
this.name = Objects.requireNonNull(name);
}

/**
* @return the fully build service address
*/
public ServiceAddress build() {
return new KubernetesServiceAddress(name, portNumber, portName);
}

/**
* Specify a port {@code number}, otherwise any port will be matched.
*
* @param number the port number, must be positive
* @return this builder
*/
public KubernetesServiceAddressBuilder withPortNumber(int number) {
if (number < 0) {
throw new IllegalArgumentException("Port number must be a positive integer");
}
this.portNumber = number;
return this;
}

/**
* Specify a TCP port {@code name}, otherwise any port will be matched.
*
* @param name the port name
* @return this builder
*/
public KubernetesServiceAddressBuilder withPortName(String name) {
this.portName = name;
return this;
}
}
51 changes: 51 additions & 0 deletions src/main/java/io/vertx/serviceresolver/kube/foo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"apiVersion": "v1",
"kind": "List",
"items": [
{
"apiVersion": "v1",
"kind": "Endpoints",
"metadata": {
"creationTimestamp": "2025-10-08T14:31:20.049213Z",
"generation": 1,
"labels": {
"app.kubernetes.io/name": "svc",
"app.kubernetes.io/version": "1.0"
},
"name": "svc",
"namespace": "test",
"resourceVersion": "3",
"uid": "f28931fe-942d-4391-90c9-31a4e03971da"
},
"subsets": [
{
"addresses": [
{
"ip": "0.0.0.0",
"targetRef": {
"kind": "Pod",
"name": "svc-0000-8080-8081",
"namespace": "test",
"uid": "5566d0e7-996b-43d8-b8d9-5950fa5ffc34"
}
}
],
"ports": [
{
"port": 8080,
"protocol": "TCP"
},
{
"port": 8081,
"protocol": "TCP"
}
]
}
]
}
],
"metadata": {
"resourceVersion": "3",
"selfLink": ""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Future<KubeServiceState<B>> resolve(ServiceAddress address, EndpointBuild
Future<EndpoinsRequest<JsonObject>> endpointsFuture = requestEndpoints(token, 0);
return endpointsFuture
.compose(endpointsRequest -> {
KubeServiceState<B> state = new KubeServiceState<>(builder, address.name());
KubeServiceState<B> state = new KubeServiceState<>(builder, address, address.name());
JsonObject response = endpointsRequest.payload;
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
JsonArray items = response.getJsonArray("items");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.endpoint.EndpointBuilder;
import io.vertx.serviceresolver.ServiceAddress;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

class KubeServiceState<B> {

final ServiceAddress address;
final String name;
final EndpointBuilder<B, SocketAddress> endpointsBuilder;
boolean disposed;
WebSocket webSocket;
AtomicReference<B> endpoints = new AtomicReference<>();
volatile boolean valid;

KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder, String name) {
KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder, ServiceAddress address, String name) {
this.endpointsBuilder = endpointsBuilder;
this.name = name;
this.address = address;
this.valid = true;
}

Expand Down Expand Up @@ -68,11 +71,22 @@ void updateEndpoints(JsonObject item) {
}
for (int k = 0;k < ports.size();k++) {
JsonObject port = ports.getJsonObject(k);
int podPort = port.getInteger("port");
int portNumber = port.getInteger("port");
String portName = port.getString("name");
if (address instanceof KubernetesServiceAddress) {
KubernetesServiceAddress kubernetesAddress = (KubernetesServiceAddress) address;
if (kubernetesAddress.portNumber > 0 && kubernetesAddress.portNumber != portNumber) {
continue;
}
if (kubernetesAddress.portName != null && !kubernetesAddress.portName.equals(portName)) {
continue;
}
}
for (String podIp : podIps) {
SocketAddress podAddress = SocketAddress.inetSocketAddress(podPort, podIp);
builder = builder.addServer(podAddress, podIp + "-" + podPort);
SocketAddress podAddress = SocketAddress.inetSocketAddress(portNumber, podIp);
builder = builder.addServer(podAddress, podIp + "-" + port);
}
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.vertx.serviceresolver.kube.impl;

import io.vertx.serviceresolver.ServiceAddress;

public class KubernetesServiceAddress implements ServiceAddress {

final String name;
final int portNumber;
final String portName;

public KubernetesServiceAddress(String name, int portNumber, String portName) {
this.name = name;
this.portNumber = portNumber;
this.portName = portName;
}

@Override
public String name() {
return name;
}
}
20 changes: 20 additions & 0 deletions src/test/java/io/vertx/tests/kube/KubeEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.vertx.tests.kube;

import java.util.Collections;
import java.util.Map;

public class KubeEndpoint {

final String ip;
final Map<Integer, String> ports;

public KubeEndpoint(String ip, int port) {
this.ip = ip;
this.ports = Collections.singletonMap(port, null);
}

public KubeEndpoint(String ip, Map<Integer, String> ports) {
this.ip = ip;
this.ports = ports;
}
}
17 changes: 17 additions & 0 deletions src/test/java/io/vertx/tests/kube/KubeServiceResolverTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.kube.KubeResolver;
import io.vertx.serviceresolver.kube.KubeResolverOptions;
import io.vertx.serviceresolver.kube.KubernetesServiceAddressBuilder;
import io.vertx.tests.HttpProxy;
import io.vertx.tests.ServiceResolverTestBase;
import org.junit.Ignore;
Expand Down Expand Up @@ -95,6 +96,22 @@ public void testNoPods(TestContext should) throws Exception {
}
}

@Test
public void testSome(TestContext should) throws Exception {
Handler<HttpServerRequest> server = req -> {
req.response().end("" + req.localAddress().port());
};
List<SocketAddress> pods = startPods(2, server);
ServiceAddress service = ServiceAddress.of("svc");
kubernetesMocking.buildAndRegisterBackendPod(service, kubernetesMocking.defaultNamespace(), KubeOp.CREATE, pods);
KubeEndpoint kubeEndpoint = new KubeEndpoint(pods.get(0).host(), Map.of(pods.get(0).port(), "p1", pods.get(1).port(), "p2"));
kubernetesMocking.buildAndRegisterKubernetesService(service.name(), kubernetesMocking.defaultNamespace(), KubeOp.CREATE, kubeEndpoint);
should.assertEquals("8080", get(KubernetesServiceAddressBuilder.of("svc").withPortName("p1").build()).toString());
should.assertEquals("8081", get(KubernetesServiceAddressBuilder.of("svc").withPortName("p2").build()).toString());
should.assertEquals("8080", get(KubernetesServiceAddressBuilder.of("svc").withPortNumber(8080).build()).toString());
should.assertEquals("8081", get(KubernetesServiceAddressBuilder.of("svc").withPortNumber(8081).build()).toString());
}

@Test
public void testUpdate() throws Exception {
Handler<HttpServerRequest> server = req -> {
Expand Down
60 changes: 38 additions & 22 deletions src/test/java/io/vertx/tests/kube/KubernetesMocking.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,40 +102,56 @@ public KubernetesClient client() {
// ips.forEach(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip));
// }

Endpoints buildAndRegisterKubernetesService(ServiceAddress service, String namespace, KubeOp op, List<SocketAddress> ipAdresses) {
return buildAndRegisterKubernetesService(service.name(), namespace, op, ipAdresses);
Endpoints buildAndRegisterKubernetesService(ServiceAddress service, String namespace, KubeOp op, List<SocketAddress> ipAddresses) {
return buildAndRegisterKubernetesService(service.name(), namespace, op, ipAddresses);
}

Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, KubeOp op, List<SocketAddress> ipAdresses) {
Endpoints buildAndRegisterKubernetesService(String applicationName,
String namespace,
KubeOp op,
List<SocketAddress> ipAddresses) {
return buildAndRegisterKubernetesService(
applicationName,
namespace,
op,
ipAddresses
.stream()
.map(ipAddress -> new KubeEndpoint(ipAddress.host(), ipAddress.port()))
.collect(Collectors.toList())
.toArray(KubeEndpoint[]::new)
);
}

Endpoints buildAndRegisterKubernetesService(String applicationName,
String namespace,
KubeOp op,
KubeEndpoint... endpoints_) {
Map<String, String> serviceLabels = new HashMap<>();
serviceLabels.put("app.kubernetes.io/name", applicationName);
serviceLabels.put("app.kubernetes.io/version", "1.0");

Map<Integer, List<EndpointAddress>> endpointAddressesMap = new LinkedHashMap<>();
for (SocketAddress sa : ipAdresses) {
List<EndpointAddress> endpointAddresses = endpointAddressesMap.compute(sa.port(), (integer, addresses) -> {
if (addresses == null) {
addresses = new ArrayList<>();
EndpointsBuilder endpointsBuilder = new EndpointsBuilder()
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata();

for (KubeEndpoint endpoint : endpoints_) {
EndpointSubsetBuilder builder = new EndpointSubsetBuilder();
StringBuilder sb = new StringBuilder(applicationName)
.append('-')
.append(endpoint.ip.replace(".", ""));
endpoint.ports.forEach((port, name) -> {
sb.append('-').append(port);
EndpointPortBuilder portBuilder = new EndpointPortBuilder().withPort(port).withProtocol("TCP");
if (name != null && !name.isEmpty()) {
portBuilder.withName(name);
}
return addresses;
builder.addToPorts(portBuilder.build());
});
ObjectReference targetRef = new ObjectReference(null, null, "Pod",
applicationName + "-" + ipAsSuffix(sa), namespace, null, UUID.randomUUID().toString());
EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(sa.host()).withTargetRef(targetRef)
.build();
endpointAddresses.add(endpointAddress);
sb.toString(), namespace, null, UUID.randomUUID().toString());
builder.withAddresses(new EndpointAddressBuilder().withIp(endpoint.ip).withTargetRef(targetRef).build());
endpointsBuilder.addToSubsets(builder.build());
}

EndpointsBuilder endpointsBuilder = new EndpointsBuilder()
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata();

endpointAddressesMap.forEach((port, addresses) -> {
endpointsBuilder.addToSubsets(new EndpointSubsetBuilder().withAddresses(addresses)
.addToPorts(new EndpointPort[] { new EndpointPortBuilder().withPort(port).withProtocol("TCP").build() })
.build());
});

NonNamespaceOperation<Endpoints, EndpointsList, Resource<Endpoints>> endpoints;
if (namespace != null) {
endpoints = client.endpoints().inNamespace(namespace);
Expand Down