Springboot+Redis实现消息队列(发布订阅模式)

0 1115

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

队列主要是为了保证队列中的数据不会被重复消费,消费队列数据一般有主动消费何自动消费,主动消费及利用程序去队列取数据,如秒杀队列任务中,队列存的是秒杀商品的ID,秒杀量是10,那么队列就存10个商品ID,这样每次下单从队列拿到商品ID就可以保证不会超卖,自动消费一般是通过监听器去监听队列,就像rabbitmq等消息队列一样,主要用于程序的异步操作、跨系统通信等业务场景,如秒杀完成后我们需要保存数据到数据库,同时更新库存,这部分操作就可以通过消息队列完成,我们只需要向消息队列中发送一个订单信息如订单ID,库存系统监听这个消息队列,监听到ID后根据ID去实现一些业务逻辑即可实现异步操作,跨系统通信了。

接下来主要是通过程序去实现:

Redis List的主要操作为lpush/lpop/rpush/rpop四种,分别代表从头部和尾部的push/pop,除此之外List还提供了两种pop操作的阻塞版本blpop/brpop,用于阻塞获取一个对象。

阻塞:如果去队列查值,队列不存在,会阻塞在当前程序,指导队列存在后,返回结果。

非阻塞:如果去队列查值,队列不存在,直接返回null,程序不会阻塞。

参考博文:https://blog.csdn.net/qq_42175986/article/details/88417023

以下是基本的代码实现方式(参考博文:https://blog.csdn.net/Muscleheng/article/details/82906617

package com.boot.test1.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* 消息发布类
* @author zhh
*/
@Service
public class Test01Service {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* 发布消息
* @param channel 消息信道
* @param message 消息内容
*/
public void sendMessage(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
/**
* 发布消息的方法
*/
public void setStr01(){
this.sendMessage("mq_01", "发送信息内容01");
this.sendMessage("mq_01", "发送信息内容011");
this.sendMessage("mq_02", "发送信息内容02");
}

}

package com.boot.common.conf;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import com.boot.test1.redismq.MessageReceiver;

/**
* Redis消息处理配置类
* @author zhh
* @date 2018-09-30
*
*/
@Configuration
public class RedisMQConfig {

/**
* 注入消息监听容器
* @param connectionFactory 连接工厂
* @param listenerAdapter   监听处理器1
* @param listenerAdapter   监听处理器2 (参数名称需和监听处理器的方法名称一致,因为@Bean注解默认注入的id就是方法名称)
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageListenerAdapter listenerAdapter2) {

       RedisMessageListenerContainer container = new RedisMessageListenerContainer();
       container.setConnectionFactory(connectionFactory);
       //订阅一个叫mq_01 的信道
       container.addMessageListener(listenerAdapter, new PatternTopic("mq_01"));
       //订阅一个叫mq_02 的信道
       container.addMessageListener(listenerAdapter2, new PatternTopic("mq_02"));
       //这个container 可以添加多个 messageListener
       return container;
  }

/**
* 消息监听处理器1
* @param receiver 处理器类
* @return
*/
   @Bean
   MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
       //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage”
       return new MessageListenerAdapter(receiver, "receiveMessage"); //receiveMessage:接收消息的方法名称
  }

/**
* 消息监听处理器2
* @param receiver 处理器类
* @return
*/
   @Bean
   MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
       //给messageListenerAdapter 传入一个消息接收的处理器,利用反射的方法调用“receiveMessage2”
       return new MessageListenerAdapter(receiver, "receiveMessage2"); //receiveMessage:接收消息的方法名称
  }
}

package com.boot.test1.redismq;

import org.springframework.stereotype.Component;

/**
* MQ消息处理器
* @author zhh
*/
@Component
public class MessageReceiver {
   
   /**
    * 接收消息的方法1
    **/
   public void receiveMessage(String message){
       System.out.println("receiveMessage接收到的消息:"+message);
  }
   
   /**
    * 接收消息的方法2
    **/
   public void receiveMessage2(String message){
       System.out.println("receiveMessage2接收到的消息:"+message);
  }
   
}

值得注意的是:

如果你正常监听到消息,你会发现所有的消息都有一对"",比如你发送一个字符串 order_id,你会发现控制台输出的不是order_id,而是"order_id",所以你需要去做一些处理,比如替换"为空格(我试过不成功),也可以像我一样,在前后加一个特殊字符,然后通过截取,比如order_id 我们实际传 |order_id|,这样实际接收的值是

"|order_id|",我们就可以根据|来截取出中间实际传递需要的信息了。

评论