Skip to content

Commit ef8c5e4

Browse files
committed
Fix error command handling code logic and add integration test for encoding failure
Summary: Fix error command handling code logic and add integration test for encoding failure Test Plan: unittest, integration test Reviewers: #ldap_storage_sre_cache, ureview, jingzhao Reviewed By: #ldap_storage_sre_cache, jingzhao Tags: #has_java JIRA Issues: REDIS-14192 Differential Revision: https://code.uberinternal.com/D19271701
1 parent bc51fcb commit ef8c5e4

File tree

2 files changed

+138
-11
lines changed

2 files changed

+138
-11
lines changed

src/main/java/io/lettuce/core/protocol/CommandHandler.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
144144
/**
145145
* Initialize a new instance that handles commands from the supplied queue.
146146
*
147-
* @param clientOptions client options for this connection, must not be {@code null}
147+
* @param clientOptions client options for this connection, must not be {@code null}
148148
* @param clientResources client resources for this connection, must not be {@code null}
149-
* @param endpoint must not be {@code null}.
149+
* @param endpoint must not be {@code null}.
150150
*/
151151
public CommandHandler(ClientOptions clientOptions, ClientResources clientResources, Endpoint endpoint) {
152152

@@ -291,7 +291,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
291291
logger.debug("{} Cleaning up encoding failure command {}", logPrefix(), failed);
292292
}
293293
}
294-
294+
295295
if (!stack.isEmpty()) {
296296
RedisCommand<?, ?, ?> command = stack.poll();
297297
if (debugEnabled) {
@@ -683,6 +683,18 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
683683
} else {
684684

685685
RedisCommand<?, ?, ?> command = stack.peek();
686+
// Clean up encoding failures before processing valid responses
687+
while (!stack.isEmpty() && stack.peek().hasEncodingError()) {
688+
RedisCommand<?, ?, ?> failed = stack.poll();
689+
if (debugEnabled) {
690+
logger.debug("{} Cleaning up encoding failure command {}", logPrefix(), failed);
691+
}
692+
// Encoding failures were already completed exceptionally during encoding
693+
if (!stack.isEmpty()) {
694+
command = stack.peek();
695+
}
696+
}
697+
686698
if (debugEnabled) {
687699
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
688700
}
@@ -706,14 +718,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
706718
if (isProtectedMode(command)) {
707719
onProtectedMode(command.getOutput().getError());
708720
} else {
709-
// Clean up encoding failures before processing valid responses
710-
while (!stack.isEmpty() && stack.peek().hasEncodingError()) {
711-
RedisCommand<?, ?, ?> failed = stack.poll();
712-
if (debugEnabled) {
713-
logger.debug("{} Cleaning up encoding failure command {}", logPrefix(), failed);
714-
}
715-
// Encoding failures were already completed exceptionally during encoding
716-
}
717721

718722
if (canComplete(command)) {
719723
stack.poll();
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2011-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lettuce.core.protocol;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.nio.ByteBuffer;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import javax.inject.Inject;
28+
29+
import io.lettuce.core.RedisClient;
30+
import io.lettuce.core.TestSupport;
31+
import io.lettuce.core.api.sync.RedisCommands;
32+
import io.lettuce.core.codec.RedisCodec;
33+
import io.netty.handler.codec.EncoderException;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.TestInstance;
37+
import org.junit.jupiter.api.extension.ExtendWith;
38+
39+
import io.lettuce.core.api.StatefulRedisConnection;
40+
import io.lettuce.core.api.async.RedisAsyncCommands;
41+
import io.lettuce.core.internal.Futures;
42+
import io.lettuce.test.LettuceExtension;
43+
44+
/**
45+
* Integration tests for command encoding error scenarios with GET/SET commands
46+
* against a Redis test instance.
47+
*
48+
* @author Lettuce Contributors
49+
*/
50+
@ExtendWith(LettuceExtension.class)
51+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
52+
class CommandEncodingErrorIntegrationTests extends TestSupport {
53+
54+
private final RedisClient client;
55+
private final StatefulRedisConnection<String, String> connection;
56+
57+
@Inject
58+
CommandEncodingErrorIntegrationTests(RedisClient client, StatefulRedisConnection<String, String> connection) {
59+
this.client = client;
60+
this.connection = connection;
61+
}
62+
63+
@BeforeEach
64+
void setUp() {
65+
this.connection.async().flushall();
66+
}
67+
68+
@Test
69+
void testCommandsWithCustomCodec() {
70+
// Create a codec that fails during value encoding with "encoding_failure" keyword
71+
RedisCodec<String, String> failingCodec = new RedisCodec<String, String>() {
72+
@Override
73+
public String decodeKey(ByteBuffer bytes) {
74+
return StandardCharsets.UTF_8.decode(bytes).toString();
75+
}
76+
77+
@Override
78+
public String decodeValue(ByteBuffer bytes) {
79+
return StandardCharsets.UTF_8.decode(bytes).toString();
80+
}
81+
82+
@Override
83+
public ByteBuffer encodeKey(String key) {
84+
return StandardCharsets.UTF_8.encode(key);
85+
}
86+
87+
@Override
88+
public ByteBuffer encodeValue(String value) {
89+
// Only throw exception for specific value to test selective encoding failure
90+
if ("encoding_failure".equals(value)) {
91+
throw new RuntimeException("Simulated encoding failure during value encoding");
92+
}
93+
return StandardCharsets.UTF_8.encode(value);
94+
}
95+
};
96+
97+
try (StatefulRedisConnection<String, String> customConnection = client.connect(failingCodec)) {
98+
RedisCommands<String, String> customRedis = customConnection.sync();
99+
100+
// First, test that normal values work fine
101+
String normalKey = "normal-key";
102+
String normalValue = "normal-value";
103+
104+
String result = customRedis.set(normalKey, normalValue);
105+
assertThat(result).isEqualTo("OK");
106+
107+
String retrieved = customRedis.get(normalKey);
108+
assertThat(retrieved).isEqualTo(normalValue);
109+
110+
// Now test that the specific failure value throws an exception
111+
String failingKey = "failing-key";
112+
String failingValue = "encoding_failure";
113+
114+
assertThatThrownBy(() -> customRedis.set(failingKey, failingValue))
115+
.isInstanceOf(EncoderException.class)
116+
.hasMessageContaining("Cannot encode command");
117+
118+
// test that we can get correct response after encoding failure
119+
retrieved = customRedis.get(normalKey);
120+
assertThat(retrieved).isEqualTo(normalValue);
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)