专业的JAVA编程教程与资源

网站首页 > java教程 正文

基于Redis实现简单的延时消息队列

temp10 2025-05-16 16:08:47 java教程 3 ℃ 0 评论


说到消息队列相信作为开发人员的大家都不陌生,在实际的工作中我们可能在很多场景下都会用到消息队列,消息队列不仅仅是用于收发消息,而且也可以用于解耦我们的应用系统设计,在大型的应用系统或者分布式应用系统中,我们必然会用到消息队列。

基于Redis实现简单的延时消息队列

总结下,消息队列的应用场景一般有以下几种场景:

  1. 异步处理任务;
  2. 应用系统解耦;
  3. 大流量削峰;
  4. 日志处理系统;
  5. 消息通讯;


目前主流的消息队列框架有:

  • Apache的ActiveMQ;
  • Erlang语言实现的RabbitMQ;
  • Apache的RocketMQ;
  • Apache的Kafka;

这几种主流的消息队列框架各有各的优势,也有略微的不同。



当然,消息队列也有一些特殊的使用场景,比如:一些电商系统中,当用户下单后,需要在规定的时间内对订单发起支付,如果在规定的时间内没有支付订单,那么该订单将自动取消。这个问题的常规解决方案有两个:

  1. 使用定时任务定时去扫描表,修改过期订单的状态。这种方案当然存在许多局限性,当订单数量不多,而且对系统性能要求不高的情况下可以考虑使用。这种方案也不够优雅;
  2. 使用基于消息队列的延时消息队列,这种方案可以对整个消息队列设置一个消息过期时间,也可以给每一个消息设置一个过期时间,这个时候消息的过期时间取决于设置的最小时间;

延时消息队列我们可以采用上面所说的消息队列框架去实现,也可以采用比较简单的基于Redis的方式去实现,众所周知Redis并不是一个消息队列框架,但是Redis在某些应用场景下可以采用其高级特性为我们提供消息队列的特性。


Rdis在常规的应用场景下,我们使用它作为高速缓存框架,搜索百度百科我们可以发现,对Redis的定义如下:

Redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis的最基本用法是我们使用它的Key-Value特性,存储我们的热点数据或者高频访问数据,来达到提高整个应用系统的吞吐量。

就像上面对Redis的定义,zset是对有序集合的操作,我们可以利用这一特性来实现我们的延时消息队列功能。大致实现的原理如下:

Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它便会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。



有了实现原理,下面我们通过一个简单的例子来演示下具体的操作过程,可能对基于Redis实现延时消息队列这一主题有更好的理解,演示的源代码链接在文章末尾附上。

新建工程:delay-message-queue-redis

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.delay.message.queue</groupId>
        <artifactId>delay-message-queue</artifactId>
        <version>0.0.1</version>
    </parent>
    <groupId>com.delay.message.queue</groupId>
    <artifactId>delay-message-queue-redis</artifactId>
    <version>0.0.1</version>
    <name>delay-message-queue-redis</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

操作Redis我们使用RedisTemplate,简单的配置如下:

@Configuration
public class RedisConfig {
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(new RedisStandaloneConfiguration("10.0.0.50", 6379));
    }

    @Bean
    public RedisTemplate redisTemplate() {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        return redisTemplate;
    }
}

定义我们的消息生产者:

@Slf4j
@Service
public class ProducerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void produce(String orderId, long expiredTime) {
        redisTemplate.opsForZSet().add(QUEUE_NAME, orderId, expiredTime);
        //log.info("order id:{} set success", orderId);
        long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
        //log.info("[produce]{} length:{}", QUEUE_NAME, length);
    }
}

消息生产者主要完成的功能:以订单的过期时间为元素(value)的得分,将数据添加到队列中去;

定义我们的消息消费者:

@Slf4j
@Service
public class ConsumerService {
    private static final String QUEUE_NAME = "delay_order_queue";
    @Autowired
    private RedisTemplate redisTemplate;

    public void consume() {
        while (true) {
            Set<String> set = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (set == null || set.isEmpty()) {
                try {
                    //log.info("no data will sleep");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("InterruptedException", e);
                }
                continue;
            }
            String orderId = set.iterator().next();
            if (redisTemplate.opsForZSet().remove(QUEUE_NAME, orderId) > 0) {
                log.info("order id:{} handle success", orderId);
                long length = redisTemplate.opsForZSet().size(QUEUE_NAME);
                log.info("[consume]{} length:{}", QUEUE_NAME, length);
            }
        }
    }
}

消息消费者的主要功能如下:

  • 循环从Redis的Zset中拿取一个0<=score<=当前时间的元素(value);
  • 如果没有取到值,则整个线程休眠一秒钟;
  • 如果取到值,则从该队列(key)删除取到的元素(value),删除的目的是防止一个数据被重复的消费;然后对取到的值进行后续的数据处理,这里是将数据打印出来。

整个生产者和消费者我们已经实现了,下面我们通过Junit来生产消息和消费消息;

生产消息的过程:

@Slf4j
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerServiceTest {
    @Autowired
    private ProducerService producerService;

    @Test
    public void produce() {
        Random random = new Random(1);
        for (int i = 0; i < 10; i++) {
            Calendar calendar = Calendar.getInstance();
            // 产生一个随机数
            int time = random.nextInt(100);
            calendar.add(Calendar.SECOND, time);
            String orderId = "order-id-" + i;
            long expired = calendar.getTimeInMillis();
            log.info("order id:{}, expired:{}", orderId, time);
            producerService.produce(orderId, expired);
            try {
                //log.info("no data will sleep");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error("InterruptedException", e);
            }
        }
    }
}

为了方便我们观察,我们采用在当前时间的基础上加上一个随机的时间作为订单的过期时间,每产生一个订单线程休眠0.5秒,总共产生10个订单。

消息的消费过程:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ConsumerServiceTest {
    @Autowired
    private ConsumerService consumerService;

    @Test
    public void consume() {
        consumerService.consume();
    }
}

该过程比较简单,它主要是启动我们的循环函数从Redis中取数据。

先启动我们的消息消费过程ConsumerServiceTest,然后再启动我们的消息生产过程ProducerServiceTest,观察日志的打印输出。

消息生产过程:


消息消费过程:


仔细对比我们可以很容易发现:

  1. order-id-5的过期时间最短,它也是最先被消费掉的,其次是order-id-7;
  2. 当生产消息的过程完成后共产生了10个消息,消息消费过程中,每消费一个消息,又没有产生新的消息的时候,整个消息队列的长度在变小;


通过上面的演示,我们实现了基于Redis实现简单的延时消息队列,最主要我们使用了Redis的Zset特性来完成延时消息队列的功能。

上面的演示在高并发场景下,可能会存在问题:

  • 高并发场景下对Redis的操作不够原子性;
  • 不适合分布式应用场景下使用;
  • 适合单体应用中延时消息队列的使用;

基于上面所提到的问题,最好的解决方案是采用消息队列框架去实现,例如:RabbitMQ给延时队列统一设置消息过期时间,过期的消息将被路由到另一个队列中;也可以给每一个消息设置一个过期时间,过期的消息同样路由到另一个队列中。


源代码GitHub地址:

https://github.com/bq-xiao/delay-message-queue


不积跬步,无以至千里;不积小流,无以成江海!

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表