Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/user-guide/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ RedisFuture<Void> future = async.subscribe("channel");
### Reactive API

The reactive API provides hot `Observable`s to listen on
`ChannelMessage`s and `PatternMessage`s. The `Observable`s receive all
`SubscriptionMessage`. The `Observable`s receive all
inbound messages. You can do filtering using the observable chain if you
need to filter out the interesting ones, The `Observable` stops
triggering events when the subscriber unsubscribes from it.
Expand All @@ -54,7 +54,14 @@ StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub(
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
reactive.observe(sink -> new RedisPubSubAdapter<String, String>() {

@Override
public void message(String channel, String message) {
sink.next(new SubscriptionMessage<>(channel, message));
}

}).doOnNext(subscriptionMessage -> {...}).subscribe()

// application flow continues
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
*/
package io.lettuce.core.pubsub;

import io.lettuce.core.pubsub.api.reactive.SubscriptionMessage;
import java.util.Map;

import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.codec.RedisCodec;
Expand Down Expand Up @@ -114,6 +117,27 @@ public void message(K channel, V message) {
}, overflowStrategy);
}

@Override
public Flux<SubscriptionMessage<K, V>> observe(
Function<FluxSink<SubscriptionMessage<K, V>>, RedisPubSubAdapter<K, V>> adapterFactory) {
return observe(FluxSink.OverflowStrategy.BUFFER, adapterFactory);
}

@Override
public Flux<SubscriptionMessage<K, V>> observe(OverflowStrategy overflowStrategy,
Function<FluxSink<SubscriptionMessage<K, V>>, RedisPubSubAdapter<K, V>> adapterFactory) {
return Flux.create(sink -> {

RedisPubSubAdapter<K, V> listener = adapterFactory.apply(sink);

StatefulRedisPubSubConnection<K, V> statefulConnection = getStatefulConnection();
statefulConnection.addListener(listener);

sink.onDispose(() -> statefulConnection.removeListener(listener));

}, overflowStrategy);
}

@Override
public Mono<Void> psubscribe(K... patterns) {
return createMono(() -> commandBuilder.psubscribe(patterns)).then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
* Message payload for a subscription to a channel.
*
* @author Mark Paluch
*
* @deprecated use {@link io.lettuce.core.pubsub.api.reactive.SubscriptionMessage} instead.
*/
@Deprecated
public class ChannelMessage<K, V> {

private final K channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
* Message payload for a subscription to a pattern.
*
* @author Mark Paluch
*
* @deprecated use {@link io.lettuce.core.pubsub.api.reactive.SubscriptionMessage} instead.
*/
@Deprecated
public class PatternMessage<K, V> {

private final K pattern;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.lettuce.core.pubsub.api.reactive;

import io.lettuce.core.pubsub.PubSubMessage;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -12,6 +15,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ko Su
* @since 5.0
*/
public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands<K, V> {
Expand All @@ -26,7 +30,10 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
* </p>
*
* @return hot Flux for subscriptions to {@literal pmessage}'s.
*
* @since 8.0 use {@link #observe(Function)} instead.
*/
@Deprecated
Flux<PatternMessage<K, V>> observePatterns();

/**
Expand All @@ -35,7 +42,10 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
*
* @param overflowStrategy the overflow strategy to use.
* @return hot Flux for subscriptions to {@literal pmessage}'s.
*
* @since 8.0 use {@link #observe(FluxSink.OverflowStrategy, Function)} instead.
*/
@Deprecated
Flux<PatternMessage<K, V>> observePatterns(FluxSink.OverflowStrategy overflowStrategy);

/**
Expand All @@ -49,7 +59,10 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
* </p>
*
* @return hot Flux for subscriptions to {@literal message}'s.
*
* @since 8.0 use {@link #observe(Function)} instead.
*/
@Deprecated
Flux<ChannelMessage<K, V>> observeChannels();

/**
Expand All @@ -58,9 +71,57 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
*
* @param overflowStrategy the overflow strategy to use.
* @return hot Flux for subscriptions to {@literal message}'s.
*
* @since 8.0 use {@link #observe(FluxSink.OverflowStrategy, Function)} instead.
*/
@Deprecated
Flux<ChannelMessage<K, V>> observeChannels(FluxSink.OverflowStrategy overflowStrategy);

/**
* Flux for messages received through channel, pattern, or sharded subscriptions.
*
* <p>
* This method allows observing all types of Pub/Sub messages ({@literal message}, {@literal pmessage}, {@literal smessage},
* etc.) using a custom {@link RedisPubSubAdapter}. The connection must be subscribed to one or more channels, patterns, or
* shard channels using {@link #subscribe(Object[])}, {@link #psubscribe(Object[])}, or {@link #ssubscribe(Object[])}.
* </p>
*
* <p>
* Warning! This method uses {@link reactor.core.publisher.FluxSink.OverflowStrategy#BUFFER} This does unbounded buffering
* and may lead to {@link OutOfMemoryError}. Use {@link #observeChannels(FluxSink.OverflowStrategy)} to specify a different
* strategy.
* </p>
*
* @param adapterFactory a factory function that creates a {@link RedisPubSubAdapter} for handling Pub/Sub callbacks using
* the provided {@link FluxSink}.
*
* @return hot Flux for subscriptions to {@literal message}'s.
*
* @since 8.0
*/
Flux<SubscriptionMessage<K, V>> observe(
Function<FluxSink<SubscriptionMessage<K, V>>, RedisPubSubAdapter<K, V>> adapterFactory);

/**
* Flux for messages received through channel, pattern, or sharded subscriptions.
*
* <p>
* This method allows observing all types of Pub/Sub messages ({@literal message}, {@literal pmessage}, {@literal smessage},
* etc.) using a custom {@link RedisPubSubAdapter}. The connection must be subscribed to one or more channels, patterns, or
* shard channels using {@link #subscribe(Object[])}, {@link #psubscribe(Object[])}, or {@link #ssubscribe(Object[])}.
* </p>
*
* @param overflowStrategy the overflow strategy to use when emitting items to the {@link FluxSink}.
* @param adapterFactory a factory function that creates a {@link RedisPubSubAdapter} for handling Pub/Sub callbacks using
* the provided {@link FluxSink}.
*
* @return hot Flux for subscriptions to {@literal message}'s.
*
* @since 8.0
*/
Flux<SubscriptionMessage<K, V>> observe(FluxSink.OverflowStrategy overflowStrategy,
Function<FluxSink<SubscriptionMessage<K, V>>, RedisPubSubAdapter<K, V>> adapterFactory);

/**
* Listen for messages published to channels matching the given patterns. The {@link Mono} completes without a result as
* soon as the pattern subscription is registered.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.lettuce.core.pubsub.api.reactive;

/**
* Message payload for a subscription to a pattern or channel or shared
*
* @author Ko Su
* @since 8.0
*/
public class SubscriptionMessage<K, V> {

/**
* Can be {@code null}.
*/
private final K pattern;

private final K channel;

private final V message;

/**
*
* @param pattern the pattern
* @param channel the channel
* @param message the message
*/
public SubscriptionMessage(K pattern, K channel, V message) {
this.pattern = pattern;
this.channel = channel;
this.message = message;
}

/**
*
* @param channel the channel
* @param message the message
*/
public SubscriptionMessage(K channel, V message) {
this.pattern = null;
this.channel = channel;
this.message = message;
}

/**
*
* @return the pattern
*/
public K getPattern() {
return pattern;
}

/**
*
* @return the channel
*/
public K getChannel() {
return channel;
}

/**
*
* @return the message
*/
public V getMessage() {
return message;
}

}
Loading
Loading