|
22 | 22 | import static io.lettuce.TestTags.INTEGRATION_TEST;
|
23 | 23 | import static io.lettuce.test.LettuceExtension.*;
|
24 | 24 | import static org.assertj.core.api.Assertions.*;
|
| 25 | +import static org.junit.jupiter.api.Assertions.assertFalse; |
25 | 26 |
|
26 | 27 | import java.util.ArrayList;
|
27 | 28 | import java.util.Collections;
|
|
30 | 31 | import java.util.List;
|
31 | 32 | import java.util.Map;
|
32 | 33 | import java.util.Set;
|
| 34 | +import java.util.concurrent.TimeUnit; |
33 | 35 | import java.util.stream.Collectors;
|
| 36 | +import java.util.concurrent.CompletableFuture; |
| 37 | +import java.util.function.Function; |
| 38 | + |
| 39 | +import io.lettuce.core.resource.ClientResources; |
| 40 | +import io.lettuce.core.resource.DefaultClientResources; |
| 41 | +import io.lettuce.core.resource.DefaultEventLoopGroupProvider; |
34 | 42 |
|
35 | 43 | import javax.enterprise.inject.New;
|
36 | 44 | import javax.inject.Inject;
|
@@ -666,6 +674,49 @@ void clusterScanCursorFinished() {
|
666 | 674 | assertThatThrownBy(() -> sync.scan(ScanCursor.FINISHED)).isInstanceOf(IllegalArgumentException.class);
|
667 | 675 | }
|
668 | 676 |
|
| 677 | + @Test |
| 678 | + void clusterScanAsyncSingleIoThread() { |
| 679 | + |
| 680 | + // Create a dedicated client with a single IO thread to stress async chaining |
| 681 | + ClientResources resources = DefaultClientResources.builder() |
| 682 | + .eventLoopGroupProvider(new DefaultEventLoopGroupProvider(1)).build(); |
| 683 | + RedisClusterClient singleThreadClient = RedisClusterClient.create(resources, |
| 684 | + RedisURI.Builder.redis(ClusterTestSettings.host, ClusterTestSettings.port1).build()); |
| 685 | + StatefulRedisClusterConnection<String, String> conn = singleThreadClient.connect(); |
| 686 | + |
| 687 | + // Seed keys across the cluster |
| 688 | + conn.sync().mset(KeysAndValues.MAP); |
| 689 | + |
| 690 | + Set<String> allKeys = new HashSet<>(); |
| 691 | + ScanArgs args = ScanArgs.Builder.limit(200); |
| 692 | + |
| 693 | + boolean timedOut = false; |
| 694 | + try { |
| 695 | + scanDatabase(conn, ScanCursor.INITIAL, args, keys -> { |
| 696 | + allKeys.addAll(keys); |
| 697 | + return CompletableFuture.completedFuture(null); |
| 698 | + }).get(5, TimeUnit.SECONDS); |
| 699 | + } catch (Exception e) { |
| 700 | + timedOut = true; |
| 701 | + } |
| 702 | + |
| 703 | + assertFalse(timedOut); |
| 704 | + assertThat(allKeys).containsAll(KeysAndValues.KEYS); |
| 705 | + } |
| 706 | + |
| 707 | + private CompletableFuture<Void> scanDatabase(StatefulRedisClusterConnection<String, String> connection, ScanCursor cursor, |
| 708 | + ScanArgs scanArgs, Function<List<String>, CompletableFuture<?>> keysProcessor) { |
| 709 | + |
| 710 | + RedisAdvancedClusterAsyncCommands<String, String> asyncCommands = connection.async(); |
| 711 | + RedisFuture<KeyScanCursor<String>> res = asyncCommands.scan(cursor, scanArgs); |
| 712 | + return res.thenCompose(newCursor -> keysProcessor.apply(newCursor.getKeys()).thenCompose(ignore -> { |
| 713 | + if (newCursor.isFinished()) { |
| 714 | + return CompletableFuture.completedFuture(null); |
| 715 | + } |
| 716 | + return scanDatabase(connection, newCursor, scanArgs, keysProcessor); |
| 717 | + })).toCompletableFuture(); |
| 718 | + } |
| 719 | + |
669 | 720 | @Test
|
670 | 721 | void clusterScanCursorNotReused() {
|
671 | 722 | assertThatThrownBy(() -> sync.scan(ScanCursor.of("dummy"))).isInstanceOf(IllegalArgumentException.class);
|
|
0 commit comments