Skip to content

Commit 9cf9a1b

Browse files
committed
Kubernetes service address.
Motivation: A K8S service sometimes carries more than a single service per endpoint, when a service is resolved by name, the resolver can get multiple ports and cannot determine the port to use. Changes: Introduce a Kubernetes service builder that allows to specify the port name or the port number, so the resolver can use it to perform a precise match.
1 parent b5ec4e1 commit 9cf9a1b

File tree

10 files changed

+255
-26
lines changed

10 files changed

+255
-26
lines changed

src/main/asciidoc/index.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ You can override these settings.
133133
{@link examples.ServiceResolverExamples#configuringKubernetesResolver}
134134
----
135135

136+
==== Matching specific service ports
137+
138+
When a service exposes more than one port, the resolver retains only a single port, it might not be the expected port.
139+
140+
You can build a specific service address for a given service port number.
141+
142+
[source,java]
143+
----
144+
{@link examples.ServiceResolverExamples#servicePortNumberMatching}
145+
----
146+
147+
Or a given service port name.
148+
149+
[source,java]
150+
----
151+
{@link examples.ServiceResolverExamples#servicePortNameMatching}
152+
----
153+
136154
=== SRV resolver
137155

138156
The SRV resolver uses DNS SRV records to resolve and locate services.

src/main/java/examples/ServiceResolverExamples.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.vertx.serviceresolver.ServiceResolverClient;
2525
import io.vertx.serviceresolver.kube.KubeResolver;
2626
import io.vertx.serviceresolver.kube.KubeResolverOptions;
27+
import io.vertx.serviceresolver.kube.KubernetesServiceAddressBuilder;
28+
import io.vertx.serviceresolver.kube.impl.KubernetesServiceAddress;
2729
import io.vertx.serviceresolver.srv.SrvResolver;
2830
import io.vertx.serviceresolver.srv.SrvResolverOptions;
2931

@@ -125,6 +127,20 @@ public void configuringKubernetesResolver(String host, int port, String namespac
125127
.setWebSocketClientOptions(wsClientOptions);
126128
}
127129

130+
public void servicePortNumberMatching() {
131+
ServiceAddress serviceAddress = KubernetesServiceAddressBuilder
132+
.of("the-service")
133+
.withPortNumber(8080)
134+
.build();
135+
}
136+
137+
public void servicePortNameMatching(String portName) {
138+
ServiceAddress serviceAddress = KubernetesServiceAddressBuilder
139+
.of("the-service")
140+
.withPortName(portName)
141+
.build();
142+
}
143+
128144
public void configuringSRVResolver(Vertx vertx, String dnsServer, int dnsPort) {
129145

130146
SrvResolverOptions options = new SrvResolverOptions()

src/main/java/io/vertx/serviceresolver/ServiceAddress.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public interface ServiceAddress extends Address {
2929
* @param name the service name
3030
* @return the service address
3131
*/
32-
static ServiceAddress of(String name) {
32+
public static ServiceAddress of(String name) {
3333
Objects.requireNonNull(name);
3434
return new ServiceAddress() {
3535
@Override
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.vertx.serviceresolver.kube;
2+
3+
import io.vertx.serviceresolver.ServiceAddress;
4+
import io.vertx.serviceresolver.kube.impl.KubernetesServiceAddress;
5+
6+
import java.util.Objects;
7+
8+
/**
9+
* <p>Build a {@link ServiceAddress} for Kubernetes capable of distinguish a service endpoint
10+
* by their <a href="https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#endpointport-v1-core">port</a>.</p>
11+
*
12+
* <p>This is useful when dealing with pods exposing multiple ports.</p>
13+
*/
14+
public class KubernetesServiceAddressBuilder {
15+
16+
public static KubernetesServiceAddressBuilder of(String name) {
17+
return new KubernetesServiceAddressBuilder(name);
18+
}
19+
20+
private final String name;
21+
private int portNumber = 0;
22+
private String portName = null;
23+
24+
public KubernetesServiceAddressBuilder(String name) {
25+
this.name = Objects.requireNonNull(name);
26+
}
27+
28+
/**
29+
* @return the fully build service address
30+
*/
31+
public ServiceAddress build() {
32+
return new KubernetesServiceAddress(name, portNumber, portName);
33+
}
34+
35+
/**
36+
* Specify a port {@code number}, otherwise any port will be matched.
37+
*
38+
* @param number the port number, must be positive
39+
* @return this builder
40+
*/
41+
public KubernetesServiceAddressBuilder withPortNumber(int number) {
42+
if (number < 0) {
43+
throw new IllegalArgumentException("Port number must be a positive integer");
44+
}
45+
this.portNumber = number;
46+
return this;
47+
}
48+
49+
/**
50+
* Specify a TCP port {@code name}, otherwise any port will be matched.
51+
*
52+
* @param name the port name
53+
* @return this builder
54+
*/
55+
public KubernetesServiceAddressBuilder withPortName(String name) {
56+
this.portName = name;
57+
return this;
58+
}
59+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"apiVersion": "v1",
3+
"kind": "List",
4+
"items": [
5+
{
6+
"apiVersion": "v1",
7+
"kind": "Endpoints",
8+
"metadata": {
9+
"creationTimestamp": "2025-10-08T14:31:20.049213Z",
10+
"generation": 1,
11+
"labels": {
12+
"app.kubernetes.io/name": "svc",
13+
"app.kubernetes.io/version": "1.0"
14+
},
15+
"name": "svc",
16+
"namespace": "test",
17+
"resourceVersion": "3",
18+
"uid": "f28931fe-942d-4391-90c9-31a4e03971da"
19+
},
20+
"subsets": [
21+
{
22+
"addresses": [
23+
{
24+
"ip": "0.0.0.0",
25+
"targetRef": {
26+
"kind": "Pod",
27+
"name": "svc-0000-8080-8081",
28+
"namespace": "test",
29+
"uid": "5566d0e7-996b-43d8-b8d9-5950fa5ffc34"
30+
}
31+
}
32+
],
33+
"ports": [
34+
{
35+
"port": 8080,
36+
"protocol": "TCP"
37+
},
38+
{
39+
"port": 8081,
40+
"protocol": "TCP"
41+
}
42+
]
43+
}
44+
]
45+
}
46+
],
47+
"metadata": {
48+
"resourceVersion": "3",
49+
"selfLink": ""
50+
}
51+
}

src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,22 @@ private void updateEndpoints(JsonObject item) {
134134
}
135135
for (int k = 0;k < ports.size();k++) {
136136
JsonObject port = ports.getJsonObject(k);
137-
int podPort = port.getInteger("port");
137+
int portNumber = port.getInteger("port");
138+
String portName = port.getString("name");
139+
if (address instanceof KubernetesServiceAddress) {
140+
KubernetesServiceAddress kubernetesAddress = (KubernetesServiceAddress) address;
141+
if (kubernetesAddress.portNumber > 0 && kubernetesAddress.portNumber != portNumber) {
142+
continue;
143+
}
144+
if (kubernetesAddress.portName != null && !kubernetesAddress.portName.equals(portName)) {
145+
continue;
146+
}
147+
}
138148
for (String podIp : podIps) {
139-
SocketAddress podAddress = SocketAddress.inetSocketAddress(podPort, podIp);
140-
builder = builder.addServer(podAddress, podIp + "-" + podPort);
149+
SocketAddress podAddress = SocketAddress.inetSocketAddress(portNumber, podIp);
150+
builder = builder.addServer(podAddress, podIp + "-" + port);
141151
}
152+
break;
142153
}
143154
}
144155
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.vertx.serviceresolver.kube.impl;
2+
3+
import io.vertx.serviceresolver.ServiceAddress;
4+
5+
public class KubernetesServiceAddress implements ServiceAddress {
6+
7+
final String name;
8+
final int portNumber;
9+
final String portName;
10+
11+
public KubernetesServiceAddress(String name, int portNumber, String portName) {
12+
this.name = name;
13+
this.portNumber = portNumber;
14+
this.portName = portName;
15+
}
16+
17+
@Override
18+
public String name() {
19+
return name;
20+
}
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.vertx.tests.kube;
2+
3+
import java.util.Collections;
4+
import java.util.Map;
5+
6+
public class KubeEndpoint {
7+
8+
final String ip;
9+
final Map<Integer, String> ports;
10+
11+
public KubeEndpoint(String ip, int port) {
12+
this.ip = ip;
13+
this.ports = Collections.singletonMap(port, null);
14+
}
15+
16+
public KubeEndpoint(String ip, Map<Integer, String> ports) {
17+
this.ip = ip;
18+
this.ports = ports;
19+
}
20+
}

src/test/java/io/vertx/tests/kube/KubeServiceResolverTestBase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.vertx.serviceresolver.ServiceAddress;
1010
import io.vertx.serviceresolver.kube.KubeResolver;
1111
import io.vertx.serviceresolver.kube.KubeResolverOptions;
12+
import io.vertx.serviceresolver.kube.KubernetesServiceAddressBuilder;
1213
import io.vertx.tests.HttpProxy;
1314
import io.vertx.tests.ServiceResolverTestBase;
1415
import org.junit.Ignore;
@@ -92,6 +93,22 @@ public void testNoPods(TestContext should) throws Exception {
9293
}
9394
}
9495

96+
@Test
97+
public void testSome(TestContext should) throws Exception {
98+
Handler<HttpServerRequest> server = req -> {
99+
req.response().end("" + req.localAddress().port());
100+
};
101+
List<SocketAddress> pods = startPods(2, server);
102+
ServiceAddress service = ServiceAddress.of("svc");
103+
kubernetesMocking.buildAndRegisterBackendPod(service, kubernetesMocking.defaultNamespace(), KubeOp.CREATE, pods);
104+
KubeEndpoint kubeEndpoint = new KubeEndpoint(pods.get(0).host(), Map.of(pods.get(0).port(), "p1", pods.get(1).port(), "p2"));
105+
kubernetesMocking.buildAndRegisterKubernetesService(service.name(), kubernetesMocking.defaultNamespace(), KubeOp.CREATE, kubeEndpoint);
106+
should.assertEquals("8080", get(KubernetesServiceAddressBuilder.of("svc").withPortName("p1").build()).toString());
107+
should.assertEquals("8081", get(KubernetesServiceAddressBuilder.of("svc").withPortName("p2").build()).toString());
108+
should.assertEquals("8080", get(KubernetesServiceAddressBuilder.of("svc").withPortNumber(8080).build()).toString());
109+
should.assertEquals("8081", get(KubernetesServiceAddressBuilder.of("svc").withPortNumber(8081).build()).toString());
110+
}
111+
95112
@Test
96113
public void testUpdate() throws Exception {
97114
Handler<HttpServerRequest> server = req -> {

src/test/java/io/vertx/tests/kube/KubernetesMocking.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,40 +102,56 @@ public KubernetesClient client() {
102102
// ips.forEach(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip));
103103
// }
104104

105-
Endpoints buildAndRegisterKubernetesService(ServiceAddress service, String namespace, KubeOp op, List<SocketAddress> ipAdresses) {
106-
return buildAndRegisterKubernetesService(service.name(), namespace, op, ipAdresses);
105+
Endpoints buildAndRegisterKubernetesService(ServiceAddress service, String namespace, KubeOp op, List<SocketAddress> ipAddresses) {
106+
return buildAndRegisterKubernetesService(service.name(), namespace, op, ipAddresses);
107107
}
108108

109-
Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, KubeOp op, List<SocketAddress> ipAdresses) {
109+
Endpoints buildAndRegisterKubernetesService(String applicationName,
110+
String namespace,
111+
KubeOp op,
112+
List<SocketAddress> ipAddresses) {
113+
return buildAndRegisterKubernetesService(
114+
applicationName,
115+
namespace,
116+
op,
117+
ipAddresses
118+
.stream()
119+
.map(ipAddress -> new KubeEndpoint(ipAddress.host(), ipAddress.port()))
120+
.collect(Collectors.toList())
121+
.toArray(KubeEndpoint[]::new)
122+
);
123+
}
110124

125+
Endpoints buildAndRegisterKubernetesService(String applicationName,
126+
String namespace,
127+
KubeOp op,
128+
KubeEndpoint... endpoints_) {
111129
Map<String, String> serviceLabels = new HashMap<>();
112130
serviceLabels.put("app.kubernetes.io/name", applicationName);
113131
serviceLabels.put("app.kubernetes.io/version", "1.0");
114132

115-
Map<Integer, List<EndpointAddress>> endpointAddressesMap = new LinkedHashMap<>();
116-
for (SocketAddress sa : ipAdresses) {
117-
List<EndpointAddress> endpointAddresses = endpointAddressesMap.compute(sa.port(), (integer, addresses) -> {
118-
if (addresses == null) {
119-
addresses = new ArrayList<>();
133+
EndpointsBuilder endpointsBuilder = new EndpointsBuilder()
134+
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata();
135+
136+
for (KubeEndpoint endpoint : endpoints_) {
137+
EndpointSubsetBuilder builder = new EndpointSubsetBuilder();
138+
StringBuilder sb = new StringBuilder(applicationName)
139+
.append('-')
140+
.append(endpoint.ip.replace(".", ""));
141+
endpoint.ports.forEach((port, name) -> {
142+
sb.append('-').append(port);
143+
EndpointPortBuilder portBuilder = new EndpointPortBuilder().withPort(port).withProtocol("TCP");
144+
if (name != null && !name.isEmpty()) {
145+
portBuilder.withName(name);
120146
}
121-
return addresses;
147+
builder.addToPorts(portBuilder.build());
122148
});
123149
ObjectReference targetRef = new ObjectReference(null, null, "Pod",
124-
applicationName + "-" + ipAsSuffix(sa), namespace, null, UUID.randomUUID().toString());
125-
EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(sa.host()).withTargetRef(targetRef)
126-
.build();
127-
endpointAddresses.add(endpointAddress);
150+
sb.toString(), namespace, null, UUID.randomUUID().toString());
151+
builder.withAddresses(new EndpointAddressBuilder().withIp(endpoint.ip).withTargetRef(targetRef).build());
152+
endpointsBuilder.addToSubsets(builder.build());
128153
}
129154

130-
EndpointsBuilder endpointsBuilder = new EndpointsBuilder()
131-
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata();
132-
133-
endpointAddressesMap.forEach((port, addresses) -> {
134-
endpointsBuilder.addToSubsets(new EndpointSubsetBuilder().withAddresses(addresses)
135-
.addToPorts(new EndpointPort[] { new EndpointPortBuilder().withPort(port).withProtocol("TCP").build() })
136-
.build());
137-
});
138-
139155
NonNamespaceOperation<Endpoints, EndpointsList, Resource<Endpoints>> endpoints;
140156
if (namespace != null) {
141157
endpoints = client.endpoints().inNamespace(namespace);

0 commit comments

Comments
 (0)