Skip to content

Commit eff243d

Browse files
ggivotishun
andauthored
Token based authentication integration with core extension (#3063)
* Support for StreamingCredentials This enables use cases like credential rotation and token based auth without client disconnect. Especially with Pub/Sub clients will reduce the chnance of missing events. * Tests & publish ReauthEvent * Clean up & Format & Add ReauthenticateEvent test * Conditionally enable connection reauthentication based on client setting DEFAULT_REAUTHENTICATE_BEHAVIOUR * Client setting for enabling reauthentication - Moved Authentication handler to DefaultEndpoint - updated since 6.6.0 * formating * resolve conflict with main * format * dispath using connection handler * Support multi with re-auth Defer the re-auth operation in case there is on-going multi Tx in lettuce need to be externally synchronised when used in multithreaded env. Since re-auth happens from different thread we need to make sure it does not happen while there is ongoing transaction. * Fix EndpointId missing in events * format * Add unit tests for setCredenatials * Skip preProcessing of auth command to avoid replacing the credential provider with static one provider Add unit tests for setCredentials * clean up - remove dead code * Moved almost all code inside the new handler * fix inTransaction lock with dispatch command batch * Remove StreamingCredentialsProvider interface. move credentials() method to RedisCredentialsProvider. Resolve issue with unsafe cast after extending RedisCredentialsProvider with supportsStreaming() method * Add authentication handler to ClusterPubSub connections * Token based auth integration with core extension Provide a way for lettuce clients to use token-based authentication. TOKENs come with a TTL. After a Redis client authenticates with a TOKEN, if they didn't renew their authentication we need to evict (close) them. The suggested approach is to leverage the existing CredentialsProvider and add support for streaming credentials to handle token refresh scenarios. Each time a new token is received connection is reauthenticated. * rebase to address "oid" core-autx lib change formating * Add EntraId integration tests Verify authentication using Azure AD with service principals * StreamingCredentialsProvider replaced with RedisCredentialsProvider.supportsStreaming() * pub/sub test basic functionality with entraid auth * Update src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java Co-authored-by: Tihomir Krasimirov Mateev <[email protected]> * Addressing review comments from @tishun * Bump redis-authx-core & redis-authx-entraid from 0.1.0-SNAPSHOT to 0.1.1-beta1 * add java doc for TokenBasedRedisCredentialsProvider --------- Co-authored-by: Tihomir Mateev <[email protected]> Co-authored-by: Tihomir Krasimirov Mateev <[email protected]>
1 parent 14de5b8 commit eff243d

27 files changed

+2245
-22
lines changed

pom.xml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,23 @@
178178
</dependencyManagement>
179179

180180
<dependencies>
181-
181+
<dependency>
182+
<groupId>redis.clients.authentication</groupId>
183+
<artifactId>redis-authx-core</artifactId>
184+
<version>0.1.1-beta1</version>
185+
</dependency>
186+
<dependency>
187+
<groupId>redis.clients.authentication</groupId>
188+
<artifactId>redis-authx-entraid</artifactId>
189+
<version>0.1.1-beta1</version>
190+
<scope>test</scope>
191+
</dependency>
192+
<dependency>
193+
<groupId>io.github.cdimascio</groupId>
194+
<artifactId>dotenv-java</artifactId>
195+
<version>2.2.0</version>
196+
<scope>test</scope>
197+
</dependency>
182198
<!-- Start of core dependencies -->
183199

184200
<dependency>
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2024, Redis Ltd. and Contributors
3+
* All rights reserved.
4+
*
5+
* Licensed under the MIT License.
6+
*/
7+
package io.lettuce.authx;
8+
9+
import io.lettuce.core.RedisCredentials;
10+
import io.lettuce.core.RedisCredentialsProvider;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import reactor.core.publisher.Flux;
14+
import reactor.core.publisher.Mono;
15+
import reactor.core.publisher.Sinks;
16+
import redis.clients.authentication.core.Token;
17+
import redis.clients.authentication.core.TokenAuthConfig;
18+
import redis.clients.authentication.core.TokenListener;
19+
import redis.clients.authentication.core.TokenManager;
20+
21+
/**
22+
* A {@link RedisCredentialsProvider} implementation that supports token-based authentication for Redis.
23+
* <p>
24+
* This provider uses a {@link TokenManager} to manage and renew tokens, ensuring that the Redis client can authenticate with
25+
* Redis using a dynamically updated token. This is particularly useful in scenarios where Redis access is controlled via
26+
* token-based authentication, such as when Redis is integrated with an identity provider like EntraID.
27+
* </p>
28+
* <p>
29+
* The provider supports streaming of credentials and automatically emits new credentials whenever a token is renewed. It must
30+
* be used with {@link io.lettuce.core.ClientOptions.ReauthenticateBehavior#ON_NEW_CREDENTIALS} to automatically re-authenticate
31+
* connections whenever new tokens are emitted by the provider.
32+
* </p>
33+
* <p>
34+
* The lifecycle of this provider is externally managed. It should be closed when there are no longer any connections using it,
35+
* to stop the token management process and release resources.
36+
* </p>
37+
*
38+
* @since 6.6
39+
*/
40+
public class TokenBasedRedisCredentialsProvider implements RedisCredentialsProvider, AutoCloseable {
41+
42+
private static final Logger log = LoggerFactory.getLogger(TokenBasedRedisCredentialsProvider.class);
43+
44+
private final TokenManager tokenManager;
45+
46+
private final Sinks.Many<RedisCredentials> credentialsSink = Sinks.many().replay().latest();
47+
48+
private TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
49+
this.tokenManager = tokenManager;
50+
}
51+
52+
private void init() {
53+
54+
TokenListener listener = new TokenListener() {
55+
56+
@Override
57+
public void onTokenRenewed(Token token) {
58+
String username = token.getUser();
59+
char[] pass = token.getValue().toCharArray();
60+
RedisCredentials credentials = RedisCredentials.just(username, pass);
61+
credentialsSink.tryEmitNext(credentials);
62+
}
63+
64+
@Override
65+
public void onError(Exception exception) {
66+
log.error("Token renew failed!", exception);
67+
}
68+
69+
};
70+
71+
try {
72+
tokenManager.start(listener, false);
73+
} catch (Exception e) {
74+
credentialsSink.tryEmitError(e);
75+
tokenManager.stop();
76+
throw new RuntimeException("Failed to start TokenManager", e);
77+
}
78+
}
79+
80+
/**
81+
* Resolve the latest available credentials as a Mono.
82+
* <p>
83+
* This method returns a Mono that emits the most recent set of Redis credentials. The Mono will complete once the
84+
* credentials are emitted. If no credentials are available at the time of subscription, the Mono will wait until
85+
* credentials are available.
86+
*
87+
* @return a Mono that emits the latest Redis credentials
88+
*/
89+
@Override
90+
public Mono<RedisCredentials> resolveCredentials() {
91+
92+
return credentialsSink.asFlux().next();
93+
}
94+
95+
/**
96+
* Expose the Flux for all credential updates.
97+
* <p>
98+
* This method returns a Flux that emits all updates to the Redis credentials. Subscribers will receive the latest
99+
* credentials whenever they are updated. The Flux will continue to emit updates until the provider is shut down.
100+
*
101+
* @return a Flux that emits all updates to the Redis credentials
102+
*/
103+
@Override
104+
public Flux<RedisCredentials> credentials() {
105+
106+
return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials
107+
}
108+
109+
@Override
110+
public boolean supportsStreaming() {
111+
return true;
112+
}
113+
114+
/**
115+
* Stop the credentials provider and clean up resources.
116+
* <p>
117+
* This method stops the TokenManager and completes the credentials sink, ensuring that all resources are properly released.
118+
* It should be called when the credentials provider is no longer needed.
119+
*/
120+
@Override
121+
public void close() {
122+
credentialsSink.tryEmitComplete();
123+
tokenManager.stop();
124+
}
125+
126+
public static TokenBasedRedisCredentialsProvider create(TokenAuthConfig tokenAuthConfig) {
127+
return create(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
128+
tokenAuthConfig.getTokenManagerConfig()));
129+
}
130+
131+
public static TokenBasedRedisCredentialsProvider create(TokenManager tokenManager) {
132+
TokenBasedRedisCredentialsProvider credentialManager = new TokenBasedRedisCredentialsProvider(tokenManager);
133+
credentialManager.init();
134+
return credentialManager;
135+
}
136+
137+
}

src/main/java/io/lettuce/core/ClientOptions.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class ClientOptions implements Serializable {
5555

5656
public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;
5757

58+
public static final ReauthenticateBehavior DEFAULT_REAUTHENTICATE_BEHAVIOUR = ReauthenticateBehavior.DEFAULT;
59+
5860
public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;
5961

6062
public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
@@ -95,6 +97,8 @@ public class ClientOptions implements Serializable {
9597

9698
private final DisconnectedBehavior disconnectedBehavior;
9799

100+
private final ReauthenticateBehavior reauthenticateBehavior;
101+
98102
private final boolean publishOnScheduler;
99103

100104
private final boolean pingBeforeActivateConnection;
@@ -124,6 +128,7 @@ protected ClientOptions(Builder builder) {
124128
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
125129
this.decodeBufferPolicy = builder.decodeBufferPolicy;
126130
this.disconnectedBehavior = builder.disconnectedBehavior;
131+
this.reauthenticateBehavior = builder.reauthenticateBehavior;
127132
this.publishOnScheduler = builder.publishOnScheduler;
128133
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
129134
this.protocolVersion = builder.protocolVersion;
@@ -143,6 +148,7 @@ protected ClientOptions(ClientOptions original) {
143148
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
144149
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
145150
this.disconnectedBehavior = original.getDisconnectedBehavior();
151+
this.reauthenticateBehavior = original.getReauthenticateBehaviour();
146152
this.publishOnScheduler = original.isPublishOnScheduler();
147153
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
148154
this.protocolVersion = original.getConfiguredProtocolVersion();
@@ -220,6 +226,8 @@ public static class Builder {
220226

221227
private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;
222228

229+
private ReauthenticateBehavior reauthenticateBehavior = DEFAULT_REAUTHENTICATE_BEHAVIOUR;
230+
223231
private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;
224232

225233
protected Builder() {
@@ -301,6 +309,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
301309
return this;
302310
}
303311

312+
/**
313+
* Configure the {@link ReauthenticateBehavior} of the Lettuce driver. Defaults to
314+
* {@link ReauthenticateBehavior#DEFAULT}.
315+
*
316+
* @param reauthenticateBehavior the {@link ReauthenticateBehavior} to use. Must not be {@code null}.
317+
* @return {@code this}
318+
*/
319+
public Builder reauthenticateBehavior(ReauthenticateBehavior reauthenticateBehavior) {
320+
321+
LettuceAssert.notNull(reauthenticateBehavior, "ReuthenticatBehavior must not be null");
322+
this.reauthenticateBehavior = reauthenticateBehavior;
323+
return this;
324+
}
325+
304326
/**
305327
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
306328
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
@@ -505,11 +527,12 @@ public ClientOptions.Builder mutate() {
505527

506528
builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
507529
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
508-
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
509-
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
510-
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
511-
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
512-
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
530+
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
531+
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
532+
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
533+
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
534+
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
535+
.timeoutOptions(getTimeoutOptions());
513536

514537
return builder;
515538
}
@@ -573,6 +596,16 @@ public DisconnectedBehavior getDisconnectedBehavior() {
573596
return disconnectedBehavior;
574597
}
575598

599+
/**
600+
* Behavior for re-authentication when the {@link RedisCredentialsProvider} emits new credentials. Defaults to
601+
* {@link ReauthenticateBehavior#DEFAULT}.
602+
*
603+
* @return the currently set {@link ReauthenticateBehavior}.
604+
*/
605+
public ReauthenticateBehavior getReauthenticateBehaviour() {
606+
return reauthenticateBehavior;
607+
}
608+
576609
/**
577610
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
578611
*
@@ -704,6 +737,46 @@ public TimeoutOptions getTimeoutOptions() {
704737
return timeoutOptions;
705738
}
706739

740+
/**
741+
* Defines the re-authentication behavior of the Redis client.
742+
* <p/>
743+
* Certain implementations of the {@link RedisCredentialsProvider} could emit new credentials at runtime. This setting
744+
* controls how the driver reacts to these newly emitted credentials.
745+
*/
746+
public enum ReauthenticateBehavior {
747+
748+
/**
749+
* This is the default behavior. The client will fetch current credentials from the underlying
750+
* {@link RedisCredentialsProvider} only when the driver needs to, e.g. when the connection is first established or when
751+
* it is re-established after a disconnect.
752+
* <p/>
753+
* <p>
754+
* No re-authentication is performed when new credentials are emitted by a {@link RedisCredentialsProvider} that
755+
* supports streaming. The client does not subscribe to or react to any updates in the credential stream provided by
756+
* {@link RedisCredentialsProvider#credentials()}.
757+
* </p>
758+
*/
759+
DEFAULT,
760+
761+
/**
762+
* Automatically triggers re-authentication whenever new credentials are emitted by a {@link RedisCredentialsProvider}
763+
* that supports streaming, as indicated by {@link RedisCredentialsProvider#supportsStreaming()}.
764+
*
765+
* <p>
766+
* When this behavior is enabled, the client subscribes to the credential stream provided by
767+
* {@link RedisCredentialsProvider#credentials()} and issues an {@code AUTH} command to the Redis server each time new
768+
* credentials are received. This behavior supports dynamic credential scenarios, such as token-based authentication, or
769+
* credential rotation where credentials are refreshed periodically to maintain access.
770+
* </p>
771+
*
772+
* <p>
773+
* Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted commands, as the
774+
* client performs re-authentication independently of user command flow.
775+
* </p>
776+
*/
777+
ON_NEW_CREDENTIALS
778+
}
779+
707780
/**
708781
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
709782
*

0 commit comments

Comments
 (0)