一 基于RedisConnectionFactory的发布订阅
1.1 订阅方代码
配置代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
@Configuration public class RedisConfig {
@Bean RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(new MessageListenerAdapter(new RedisReceiver(), "onMessage"), new PatternTopic("d?*"));
return container; } }
|
被消费的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j public class RedisReceiver implements MessageListener {
private CommonBaseService commonBaseService;
@Override public void onMessage(Message message, byte[] pattern) { }
public RedisReceiver(CommonBaseService commonBaseService) { this.commonBaseService = commonBaseService; } }
|
1.2 发布方代码
1 2 3 4 5 6
| @Autowired private RedisTemplate<String, Object> redisTemplate;
//发布消息 redisTemplate.convertAndSend(EventConstant.CHANNEL_NAME, behavior);
|
二 使用ReactiveRedisTemplate发布订阅
在项目里加入以下依赖
1 2 3 4 5 6 7 8
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
|
发布订阅的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Component public class AfterCommond implements CommandLineRunner {
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private ReactiveRedisTemplate reactiveRedisTemplate;
@Override public void run(String... args) throws Exception {
reactiveRedisTemplate.listenToChannel("demo?").doOnNext(msg -> { System.out.println("接收到的消息?为 " + msg); }).subscribe();
reactiveRedisTemplate.listenToChannel("demo?*").doOnNext(msg -> { System.out.println("接收到的消息?*为 " + msg); }).subscribe();
reactiveRedisTemplate.listenToChannel("demo*").doOnNext(msg -> { System.out.println("接收到的消息*为 " + msg); }).subscribe();
reactiveRedisTemplate.listenToChannel("demo").doOnNext(msg -> { System.out.println("接收到的消息A为 " + msg); }).subscribe();
reactiveRedisTemplate.listenToChannel("/demo").doOnNext(msg -> { System.out.println("接收到的消息B为 " + msg); }).subscribe();
new Thread(() -> { while (true) { try { redisTemplate.convertAndSend("demo", "A" + System.currentTimeMillis()); redisTemplate.convertAndSend("/demo", "B" + System.currentTimeMillis()); Thread.sleep(2000); System.out.println("------->L A"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
} }
|
在控制台可以看到以下日志
1 2 3 4 5 6 7 8 9 10 11 12
| ------->L A 接收到的消息A为 ChannelMessage {channel=demo, message=A1611801361359} 接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801361360} ------->L A 接收到的消息A为 ChannelMessage {channel=demo, message=A1611801363361} 接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801363362} ------->L A 接收到的消息A为 ChannelMessage {channel=demo, message=A1611801365363} 接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801365364} ------->L A 接收到的消息A为 ChannelMessage {channel=demo, message=A1611801367366} 接收到的消息B为 ChannelMessage {channel=/demo, message=B1611801367367}
|
注意:
使用ReactiveRedisTemplate进行订阅时暂不支持通配符模式
三 使用ReactiveRedisMessageListenerContainer订阅
订阅代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Bean public ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory factory) { ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory); Flux<ReactiveSubscription.Message<String, String>> receive = container.receive(ChannelTopic.of("demo")); receive.doOnNext(t -> {
try { ReactiveSubscription.Message ms = t;
System.out.println(new String(ms.getMessage().toString().getBytes("utf-8"), "utf-8"));
System.out.println("------> 接收到的消息为 " + t);
System.out.println("--- " + ms); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } }).subscribe(); return container; }
|
注意:
- 在此模式下,发送消息时注意设置好序列化,否则在接收方容易出现乱码。
- ChannelTopic不支持通配符
如果需要支持通配符,请使用以下模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Bean public ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory factory) { ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory); Flux<ReactiveSubscription.PatternMessage<String, String, String>> receive = container.receive(new PatternTopic[]{PatternTopic.of("de?*")}); receive.doOnNext(t -> {
try { ReactiveSubscription.Message ms = t;
System.out.println("------> 接收到的消息为 " + t);
System.out.println("--- " + ms); } catch (Exception e) { e.printStackTrace(); } }).subscribe(); return container; }
|
此时控制台收到的消息如下:
1 2 3 4 5 6 7 8 9
| ------->L A ------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808659713"} --- PatternMessage{channel=demo, pattern=de?*, message="A1611808659713"} ------->L A ------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808661715"} --- PatternMessage{channel=demo, pattern=de?*, message="A1611808661715"} ------->L A ------> 接收到的消息为 PatternMessage{channel=demo, pattern=de?*, message="A1611808663717"} --- PatternMessage{channel=demo, pattern=de?*, message="A1611808663717"}
|