Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,14 @@

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.endpoint.EndpointBuilder;
import io.vertx.core.spi.endpoint.EndpointResolver;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.kube.KubeResolverOptions;

import java.util.List;
import java.util.function.Function;

import static io.vertx.core.http.HttpMethod.GET;

public class KubeResolverImpl<B> implements EndpointResolver<ServiceAddress, SocketAddress, KubeServiceState<B>, B> {

final KubeResolverOptions options;
Expand Down Expand Up @@ -59,45 +51,10 @@ public ServiceAddress tryCast(Address address) {

@Override
public Future<KubeServiceState<B>> resolve(ServiceAddress address, EndpointBuilder<B, SocketAddress> builder) {
return httpClient
.request(new RequestOptions()
.setMethod(GET)
.setServer(server)
.setURI("/api/v1/namespaces/" + namespace + "/endpoints"))
.compose(req -> {
if (bearerToken != null) {
req.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken); // Todo concat that ?
}
return req.send().compose(resp -> {
if (resp.statusCode() == 200) {
return resp
.body()
.map(Buffer::toJsonObject);
} else {
return resp.body().transform(ar -> {
StringBuilder msg = new StringBuilder("Invalid status code " + resp.statusCode());
if (ar.succeeded()) {
msg.append(" : ").append(ar.result().toString());
}
return Future.failedFuture(msg.toString());
});
}
});
}).map(response -> {
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
KubeServiceState<B> state = new KubeServiceState<>(builder, this, vertx, resourceVersion, address.name());
JsonArray items = response.getJsonArray("items");
for (int i = 0;i < items.size();i++) {
JsonObject item = items.getJsonObject(i);
state.handleEndpoints(item);
}
return state;
}).andThen(ar -> {
if (ar.succeeded()) {
KubeServiceState<B> res = ar.result();
res.connectWebSocket();
}
});
KubeServiceState<B> state = new KubeServiceState<>(builder, this, address, vertx, address.name());
return state
.connect()
.map(state);
}

@Override
Expand All @@ -107,7 +64,8 @@ public B endpoint(KubeServiceState<B> data) {

@Override
public void close() {

httpClient.close();
wsClient.close();
}

@Override
Expand All @@ -125,6 +83,6 @@ public void dispose(KubeServiceState<B> unused) {

@Override
public boolean isValid(KubeServiceState<B> state) {
return true;
return state.valid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,90 +10,112 @@
*/
package io.vertx.serviceresolver.kube.impl;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.endpoint.EndpointBuilder;
import io.vertx.serviceresolver.ServiceAddress;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static io.vertx.core.http.HttpMethod.GET;

class KubeServiceState<B> {

final String name;
final Vertx vertx;
final KubeResolverImpl resolver;
final KubeResolverImpl<B> resolver;
final EndpointBuilder<B, SocketAddress> endpointsBuilder;
String lastResourceVersion;
final ServiceAddress address;
boolean disposed;
WebSocket ws;
AtomicReference<B> endpoints = new AtomicReference<>();
volatile boolean valid;

KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) {
KubeServiceState(EndpointBuilder<B, SocketAddress> endpointsBuilder,
KubeResolverImpl<B> resolver,
ServiceAddress address,
Vertx vertx,
String name) {
this.endpointsBuilder = endpointsBuilder;
this.name = name;
this.resolver = resolver;
this.vertx = vertx;
this.lastResourceVersion = lastResourceVersion;
this.address = address;
this.valid = true;
}

Future<?> connect() {
RequestOptions options = new RequestOptions()
.setMethod(GET)
.setServer(resolver.server)
.setURI("/api/v1/namespaces/" + resolver.namespace + "/endpoints");
if (resolver.bearerToken != null) {
options.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + resolver.bearerToken); // Todo concat that ?
}
return resolver.httpClient
.request(options)
.compose(request -> request
.send()
.expecting(HttpResponseExpectation.SC_OK)
.compose(response -> response
.body()
.map(Buffer::toJsonObject)))
.compose(response -> {
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
JsonArray items = response.getJsonArray("items");
for (int i = 0;i < items.size();i++) {
JsonObject item = items.getJsonObject(i);
updateEndpoints(item);
}
return connectWebSocket(resourceVersion);
});
}

void connectWebSocket() {
Future<?> connectWebSocket(String resourceVersion) {
String requestURI = "/api/v1/namespaces/" + resolver.namespace + "/endpoints?"
+ "watch=true"
+ "&"
+ "allowWatchBookmarks=true"
+ "&"
+ "resourceVersion=" + lastResourceVersion;
+ "resourceVersion=" + resourceVersion;
WebSocketConnectOptions connectOptions = new WebSocketConnectOptions();
connectOptions.setServer(resolver.server);
connectOptions.setURI(requestURI);
if (resolver.bearerToken != null) {
connectOptions.putHeader(HttpHeaders.AUTHORIZATION, "Bearer " + resolver.bearerToken);
}
resolver.wsClient.webSocket()
return resolver.wsClient.webSocket()
.handler(buff -> {
JsonObject update = buff.toJsonObject();
handleUpdate(update);
})
.closeHandler(v -> {
if (!disposed) {
connectWebSocket();
}
}).connect(connectOptions).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();
if (disposed) {
ws.close();
} else {
this.ws = ws;
}
} else {
if (!disposed) {
// Retry WebSocket connect
vertx.setTimer(500, id -> {
connectWebSocket();
});
}
}
});
valid = false;
}).connect(connectOptions);
}

void handleUpdate(JsonObject update) {
String type = update.getString("type");
JsonObject object = update.getJsonObject("object");
JsonObject metadata = object.getJsonObject("metadata");
String resourceVersion = metadata.getString("resourceVersion");
if (!lastResourceVersion.equals(resourceVersion)) {
handleEndpoints(object);
switch (type) {
case "ADDED":
case "MODIFIED":
case "DELETED":
JsonObject object = update.getJsonObject("object");
JsonObject metadata = object.getJsonObject("metadata");
String resourceVersion = metadata.getString("resourceVersion");
updateEndpoints(object);
break;
}
}

void handleEndpoints(JsonObject item) {
private void updateEndpoints(JsonObject item) {
JsonObject metadata = item.getJsonObject("metadata");
String name = metadata.getString("name");
if (this.name.equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testEmptyPods() {
}

@Test
public void testReconnectWebSocket(TestContext should) throws Exception {
public void testWebSocketClose(TestContext should) throws Exception {
Handler<HttpServerRequest> server = req -> {
req.response().end("" + req.localAddress().port());
};
Expand All @@ -148,11 +148,10 @@ public void testReconnectWebSocket(TestContext should) throws Exception {
assertWaitUntil(() -> proxy.webSockets().size() == 1);
WebSocketBase ws = proxy.webSockets().iterator().next();
ws.close();
assertWaitUntil(() -> proxy.webSockets().size() == 1 && !proxy.webSockets().contains(ws));
assertWaitUntil(() -> !proxy.webSockets().contains(ws));
kubernetesMocking.buildAndRegisterKubernetesService(service, kubernetesMocking.defaultNamespace(), KubeOp.UPDATE, pods);
checkEndpoints(service, "8080", "8081");
}

/*
@Test
public void testDispose(TestContext should) throws Exception {
Expand Down