Skip to content

Commit b5ec4e1

Browse files
committed
Reimplement Kubernetes resolver WebSocket failures.
Motivation: The implementation of the Kubernetes resolver relies on watching a Kubernetes resource through a WebSocket to maintain its state. When the WebSocket fails, a reconnection is attemped which might fails, in particular the server might respond with a 410 GONE response indicating the the watched resource is not available anymore. We could handle such failure in the resolver itself and perform a new GET then Watch operation, but it turns out that the endpoint resolver implements this already. Changes: When the resolver resolves a service, make the WebSocket connect part of the resolution and not a side effect of the resolution, hence if the WebSocket cannot connect, the resolution fails and the HTTP client reacts accordingly. When the WebSocket is disconnected, mark the kube service state as invalid, this state is probed by the HTTP client and will attempt a new resolution leading to a new GET then Watch sequence in the resolver.
1 parent 16484fe commit b5ec4e1

File tree

3 files changed

+67
-88
lines changed

3 files changed

+67
-88
lines changed

src/main/java/io/vertx/serviceresolver/kube/impl/KubeResolverImpl.java

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,14 @@
1212

1313
import io.vertx.core.Future;
1414
import io.vertx.core.Vertx;
15-
import io.vertx.core.buffer.Buffer;
1615
import io.vertx.core.http.*;
17-
import io.vertx.core.json.JsonArray;
18-
import io.vertx.core.json.JsonObject;
1916
import io.vertx.core.net.Address;
2017
import io.vertx.core.net.SocketAddress;
2118
import io.vertx.core.spi.endpoint.EndpointBuilder;
2219
import io.vertx.core.spi.endpoint.EndpointResolver;
2320
import io.vertx.serviceresolver.ServiceAddress;
2421
import io.vertx.serviceresolver.kube.KubeResolverOptions;
2522

26-
import java.util.List;
27-
import java.util.function.Function;
28-
29-
import static io.vertx.core.http.HttpMethod.GET;
30-
3123
public class KubeResolverImpl<B> implements EndpointResolver<ServiceAddress, SocketAddress, KubeServiceState<B>, B> {
3224

3325
final KubeResolverOptions options;
@@ -59,45 +51,10 @@ public ServiceAddress tryCast(Address address) {
5951

6052
@Override
6153
public Future<KubeServiceState<B>> resolve(ServiceAddress address, EndpointBuilder<B, SocketAddress> builder) {
62-
return httpClient
63-
.request(new RequestOptions()
64-
.setMethod(GET)
65-
.setServer(server)
66-
.setURI("/api/v1/namespaces/" + namespace + "/endpoints"))
67-
.compose(req -> {
68-
if (bearerToken != null) {
69-
req.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken); // Todo concat that ?
70-
}
71-
return req.send().compose(resp -> {
72-
if (resp.statusCode() == 200) {
73-
return resp
74-
.body()
75-
.map(Buffer::toJsonObject);
76-
} else {
77-
return resp.body().transform(ar -> {
78-
StringBuilder msg = new StringBuilder("Invalid status code " + resp.statusCode());
79-
if (ar.succeeded()) {
80-
msg.append(" : ").append(ar.result().toString());
81-
}
82-
return Future.failedFuture(msg.toString());
83-
});
84-
}
85-
});
86-
}).map(response -> {
87-
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
88-
KubeServiceState<B> state = new KubeServiceState<>(builder, this, vertx, resourceVersion, address.name());
89-
JsonArray items = response.getJsonArray("items");
90-
for (int i = 0;i < items.size();i++) {
91-
JsonObject item = items.getJsonObject(i);
92-
state.handleEndpoints(item);
93-
}
94-
return state;
95-
}).andThen(ar -> {
96-
if (ar.succeeded()) {
97-
KubeServiceState<B> res = ar.result();
98-
res.connectWebSocket();
99-
}
100-
});
54+
KubeServiceState<B> state = new KubeServiceState<>(builder, this, address, vertx, address.name());
55+
return state
56+
.connect()
57+
.map(state);
10158
}
10259

10360
@Override
@@ -107,7 +64,8 @@ public B endpoint(KubeServiceState<B> data) {
10764

10865
@Override
10966
public void close() {
110-
67+
httpClient.close();
68+
wsClient.close();
11169
}
11270

11371
@Override
@@ -125,6 +83,6 @@ public void dispose(KubeServiceState<B> unused) {
12583

12684
@Override
12785
public boolean isValid(KubeServiceState<B> state) {
128-
return true;
86+
return state.valid;
12987
}
13088
}

src/main/java/io/vertx/serviceresolver/kube/impl/KubeServiceState.java

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,90 +10,112 @@
1010
*/
1111
package io.vertx.serviceresolver.kube.impl;
1212

13+
import io.vertx.core.Future;
1314
import io.vertx.core.Vertx;
14-
import io.vertx.core.http.HttpHeaders;
15-
import io.vertx.core.http.WebSocket;
16-
import io.vertx.core.http.WebSocketConnectOptions;
15+
import io.vertx.core.buffer.Buffer;
16+
import io.vertx.core.http.*;
1717
import io.vertx.core.json.JsonArray;
1818
import io.vertx.core.json.JsonObject;
1919
import io.vertx.core.net.SocketAddress;
2020
import io.vertx.core.spi.endpoint.EndpointBuilder;
21+
import io.vertx.serviceresolver.ServiceAddress;
2122

2223
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.concurrent.atomic.AtomicReference;
2526

27+
import static io.vertx.core.http.HttpMethod.GET;
28+
2629
class KubeServiceState<B> {
2730

2831
final String name;
2932
final Vertx vertx;
30-
final KubeResolverImpl resolver;
33+
final KubeResolverImpl<B> resolver;
3134
final EndpointBuilder<B, SocketAddress> endpointsBuilder;
32-
String lastResourceVersion;
35+
final ServiceAddress address;
3336
boolean disposed;
3437
WebSocket ws;
3538
AtomicReference<B> endpoints = new AtomicReference<>();
39+
volatile boolean valid;
3640

37-
KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) {
41+
KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder,
42+
KubeResolverImpl<B> resolver,
43+
ServiceAddress address,
44+
Vertx vertx,
45+
String name) {
3846
this.endpointsBuilder = endpointsBuilder;
3947
this.name = name;
4048
this.resolver = resolver;
4149
this.vertx = vertx;
42-
this.lastResourceVersion = lastResourceVersion;
50+
this.address = address;
51+
this.valid = true;
52+
}
53+
54+
Future<?> connect() {
55+
RequestOptions options = new RequestOptions()
56+
.setMethod(GET)
57+
.setServer(resolver.server)
58+
.setURI("/api/v1/namespaces/" + resolver.namespace + "/endpoints");
59+
if (resolver.bearerToken != null) {
60+
options.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + resolver.bearerToken); // Todo concat that ?
61+
}
62+
return resolver.httpClient
63+
.request(options)
64+
.compose(request -> request
65+
.send()
66+
.expecting(HttpResponseExpectation.SC_OK)
67+
.compose(response -> response
68+
.body()
69+
.map(Buffer::toJsonObject)))
70+
.compose(response -> {
71+
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
72+
JsonArray items = response.getJsonArray("items");
73+
for (int i = 0;i < items.size();i++) {
74+
JsonObject item = items.getJsonObject(i);
75+
updateEndpoints(item);
76+
}
77+
return connectWebSocket(resourceVersion);
78+
});
4379
}
4480

45-
void connectWebSocket() {
81+
Future<?> connectWebSocket(String resourceVersion) {
4682
String requestURI = "/api/v1/namespaces/" + resolver.namespace + "/endpoints?"
4783
+ "watch=true"
4884
+ "&"
4985
+ "allowWatchBookmarks=true"
5086
+ "&"
51-
+ "resourceVersion=" + lastResourceVersion;
87+
+ "resourceVersion=" + resourceVersion;
5288
WebSocketConnectOptions connectOptions = new WebSocketConnectOptions();
5389
connectOptions.setServer(resolver.server);
5490
connectOptions.setURI(requestURI);
5591
if (resolver.bearerToken != null) {
5692
connectOptions.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + resolver.bearerToken);
5793
}
58-
resolver.wsClient.webSocket()
94+
return resolver.wsClient.webSocket()
5995
.handler(buff -> {
6096
JsonObject update = buff.toJsonObject();
6197
handleUpdate(update);
6298
})
6399
.closeHandler(v -> {
64-
if (!disposed) {
65-
connectWebSocket();
66-
}
67-
}).connect(connectOptions).onComplete(ar -> {
68-
if (ar.succeeded()) {
69-
WebSocket ws = ar.result();
70-
if (disposed) {
71-
ws.close();
72-
} else {
73-
this.ws = ws;
74-
}
75-
} else {
76-
if (!disposed) {
77-
// Retry WebSocket connect
78-
vertx.setTimer(500, id -> {
79-
connectWebSocket();
80-
});
81-
}
82-
}
83-
});
100+
valid = false;
101+
}).connect(connectOptions);
84102
}
85103

86104
void handleUpdate(JsonObject update) {
87105
String type = update.getString("type");
88-
JsonObject object = update.getJsonObject("object");
89-
JsonObject metadata = object.getJsonObject("metadata");
90-
String resourceVersion = metadata.getString("resourceVersion");
91-
if (!lastResourceVersion.equals(resourceVersion)) {
92-
handleEndpoints(object);
106+
switch (type) {
107+
case "ADDED":
108+
case "MODIFIED":
109+
case "DELETED":
110+
JsonObject object = update.getJsonObject("object");
111+
JsonObject metadata = object.getJsonObject("metadata");
112+
String resourceVersion = metadata.getString("resourceVersion");
113+
updateEndpoints(object);
114+
break;
93115
}
94116
}
95117

96-
void handleEndpoints(JsonObject item) {
118+
private void updateEndpoints(JsonObject item) {
97119
JsonObject metadata = item.getJsonObject("metadata");
98120
String name = metadata.getString("name");
99121
if (this.name.equals(name)) {

src/test/java/io/vertx/tests/kube/KubeServiceResolverTestBase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void testEmptyPods() {
136136
}
137137

138138
@Test
139-
public void testReconnectWebSocket(TestContext should) throws Exception {
139+
public void testWebSocketClose(TestContext should) throws Exception {
140140
Handler<HttpServerRequest> server = req -> {
141141
req.response().end("" + req.localAddress().port());
142142
};
@@ -148,11 +148,10 @@ public void testReconnectWebSocket(TestContext should) throws Exception {
148148
assertWaitUntil(() -> proxy.webSockets().size() == 1);
149149
WebSocketBase ws = proxy.webSockets().iterator().next();
150150
ws.close();
151-
assertWaitUntil(() -> proxy.webSockets().size() == 1 && !proxy.webSockets().contains(ws));
151+
assertWaitUntil(() -> !proxy.webSockets().contains(ws));
152152
kubernetesMocking.buildAndRegisterKubernetesService(service, kubernetesMocking.defaultNamespace(), KubeOp.UPDATE, pods);
153153
checkEndpoints(service, "8080", "8081");
154154
}
155-
156155
/*
157156
@Test
158157
public void testDispose(TestContext should) throws Exception {

0 commit comments

Comments
 (0)