Skip to content

Commit 8278f54

Browse files
ggivotishun
authored andcommitted
io.lettuce.core.RedisCommandExecutionException: NOAUTH Authentication required on CLIENT and READONLY command (#3035)
Using custom credentials provider can delay providing of credentials. In this case applyPostHandshake and applyConnectionMetadata got executed before handshake and lead to NOAUTH error in the log for CLIENT command.
1 parent 08096cf commit 8278f54

File tree

3 files changed

+85
-16
lines changed

3 files changed

+85
-16
lines changed

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
</developers>
4646

4747
<properties>
48+
<awaitility.version>4.2.2</awaitility.version>
4849
<assertj-core.version>3.25.3</assertj-core.version>
4950
<cdi-api.version>2.0.SP1</cdi-api.version>
5051
<brave.version>5.13.11</brave.version>
@@ -102,7 +103,12 @@
102103

103104
<dependencyManagement>
104105
<dependencies>
105-
106+
<dependency>
107+
<groupId>org.awaitility</groupId>
108+
<artifactId>awaitility</artifactId>
109+
<version>${awaitility.version}</version>
110+
<scope>test</scope>
111+
</dependency>
106112
<dependency>
107113
<groupId>io.netty</groupId>
108114
<artifactId>netty-bom</artifactId>
@@ -319,6 +325,12 @@
319325
<scope>test</scope>
320326
</dependency>
321327

328+
<dependency>
329+
<groupId>org.awaitility</groupId>
330+
<artifactId>awaitility</artifactId>
331+
<scope>test</scope>
332+
</dependency>
333+
322334
<dependency>
323335
<groupId>org.hdrhistogram</groupId>
324336
<artifactId>HdrHistogram</artifactId>

src/main/java/io/lettuce/core/RedisHandshake.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,14 @@ public CompletionStage<Void> initialize(Channel channel) {
101101
new RedisConnectionException("Protocol version" + this.requestedProtocolVersion + " not supported"));
102102
}
103103

104-
// post-handshake commands, whose execution failures would cause the connection to be considered
105-
// unsuccessfully established
106-
CompletableFuture<Void> postHandshake = applyPostHandshake(channel);
107-
108-
// post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different
109-
// implementations or versions of the server runtime, and whose execution result (whether a success or a
110-
// failure ) should not alter the outcome of the connection attempt
111-
CompletableFuture<Void> connectionMetadata = applyConnectionMetadata(channel).handle((result, error) -> {
112-
if (error != null) {
113-
LOG.debug("Error applying connection metadata", error);
114-
}
115-
return null;
116-
});
117-
118-
return handshake.thenCompose(ignore -> postHandshake).thenCompose(ignore -> connectionMetadata);
104+
return handshake
105+
// post-handshake commands, whose execution failures would cause the connection to be considered
106+
// unsuccessfully established
107+
.thenCompose(ignore -> applyPostHandshake(channel))
108+
// post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different
109+
// implementations or versions of the server runtime, and whose execution result (whether a success or a
110+
// failure ) should not alter the outcome of the connection attempt
111+
.thenCompose(ignore -> applyConnectionMetadataSafely(channel));
119112
}
120113

121114
private CompletionStage<?> tryHandshakeResp3(Channel channel) {
@@ -271,6 +264,15 @@ private CompletableFuture<Void> applyPostHandshake(Channel channel) {
271264
return dispatch(channel, postHandshake);
272265
}
273266

267+
private CompletionStage<Void> applyConnectionMetadataSafely(Channel channel) {
268+
return applyConnectionMetadata(channel).handle((result, error) -> {
269+
if (error != null) {
270+
LOG.debug("Error applying connection metadata", error);
271+
}
272+
return null;
273+
});
274+
}
275+
274276
private CompletableFuture<Void> applyConnectionMetadata(Channel channel) {
275277

276278
List<AsyncCommand<?, ?, ?>> postHandshake = new ArrayList<>();

src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package io.lettuce.core;
22

33
import static org.assertj.core.api.Assertions.*;
4+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
45

56
import java.nio.ByteBuffer;
67
import java.util.List;
78
import java.util.Map;
89
import java.util.concurrent.CompletionStage;
910

11+
import org.awaitility.Awaitility;
1012
import org.junit.jupiter.api.Test;
1113

1214
import io.lettuce.core.output.CommandOutput;
1315
import io.lettuce.core.protocol.AsyncCommand;
1416
import io.lettuce.core.protocol.ProtocolVersion;
1517
import io.netty.channel.embedded.EmbeddedChannel;
18+
import reactor.core.publisher.Mono;
19+
import reactor.core.publisher.Sinks;
1620

1721
/**
1822
* Unit tests for {@link RedisHandshake}.
@@ -103,6 +107,42 @@ void handshakeFireAndForgetPostHandshake() {
103107
assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse();
104108
}
105109

110+
@Test
111+
void handshakeDelayedCredentialProvider() {
112+
113+
DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider();
114+
// RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo",
115+
// "bar")).delayElement(Duration.ofMillis(3));
116+
EmbeddedChannel channel = new EmbeddedChannel(true, false);
117+
118+
ConnectionMetadata connectionMetdata = new ConnectionMetadata();
119+
connectionMetdata.setLibraryName("library-name");
120+
connectionMetdata.setLibraryVersion("library-version");
121+
122+
ConnectionState state = new ConnectionState();
123+
state.setCredentialsProvider(cp);
124+
state.apply(connectionMetdata);
125+
RedisHandshake handshake = new RedisHandshake(null, false, state);
126+
CompletionStage<Void> handshakeInit = handshake.initialize(channel);
127+
cp.completeCredentials(RedisCredentials.just("foo", "bar"));
128+
129+
Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds
130+
.pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds
131+
.until(() -> !channel.outboundMessages().isEmpty());
132+
133+
AsyncCommand<String, String, Map<String, String>> hello = channel.readOutbound();
134+
helloResponse(hello.getOutput());
135+
hello.complete();
136+
137+
List<AsyncCommand<String, String, Map<String, String>>> postHandshake = channel.readOutbound();
138+
postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND);
139+
postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND));
140+
postHandshake.get(0).complete();
141+
142+
assertThat(postHandshake.size()).isEqualTo(2);
143+
assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse();
144+
}
145+
106146
@Test
107147
void shouldParseVersionWithCharacters() {
108148

@@ -133,4 +173,19 @@ private static void helloResponse(CommandOutput<String, String, Map<String, Stri
133173
output.set(ByteBuffer.wrap("1.2.3".getBytes()));
134174
}
135175

176+
static class DelayedRedisCredentialsProvider implements RedisCredentialsProvider {
177+
178+
private final Sinks.One<RedisCredentials> credentialsSink = Sinks.one();
179+
180+
@Override
181+
public Mono<RedisCredentials> resolveCredentials() {
182+
return credentialsSink.asMono();
183+
}
184+
185+
public void completeCredentials(RedisCredentials credentials) {
186+
credentialsSink.tryEmitValue(credentials);
187+
}
188+
189+
}
190+
136191
}

0 commit comments

Comments
 (0)