RabbitMQ 笔记——SpringBoot 集成 RabbitMQ 的应用与消息可靠性等实践(进阶篇)

1 概述

本文记录了在 SpringBoot 项目中集成 RabbitMQ 的进阶功能实现,涵盖消息发送与接收、消息可靠性投递、消费端限流、消息超时、死信队列、延迟队列、事务消息以及优先级队列的配置与应用。文章旨在为开发者提供清晰的实践指南,帮助理解 RabbitMQ 在实际项目中的高级用法及消息可靠性保障机制。

2 SpringBoot 与 RabbitMQ 集成

2.1 环境搭建与基本消息收发

本节介绍如何在 SpringBoot 项目中搭建 RabbitMQ 的生产者与消费者模块,并实现基本的消息发送与接收功能。

2.1.1 搭建生产者与消费者模块

  1. 消费者模块配置

    创建模块 module02-boot-consumer,添加以下依赖:

    <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-webartifactId>
    dependency>

    配置 application.yml 文件,连接本地 RabbitMQ 服务:

    spring:
    rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: 123456
      virtual-host: /
  2. 生产者模块配置

    创建模块 module03-boot-producer,添加以下依赖:

    <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
       <groupId>org.springframework.bootgroupId>
       <artifactId>spring-boot-starter-testartifactId>
       <scope>testscope>
    dependency>

    配置与消费者模块一致的 application.yml 文件。

2.1.2 实现消息接收

在消费者模块中,使用 @RabbitListener 注解创建自定义监听器,接收消息:

package com.sangui.bootconsumer.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-11
*/
@Component
public class MyMessageListener {
   public static final String EXCHANGE_DIRECT = "exchange.direct.order";
   public static final String ROUTING_KEY = "order";
   public static final String QUEUE_NAME = "queue.order";

   @RabbitListener(bindings = @QueueBinding(
           value = @Queue(value = QUEUE_NAME, durable = "true"),
           exchange = @Exchange(value = EXCHANGE_DIRECT),
           key = {ROUTING_KEY}
  ))
   public void processMessage(String dataString, Message message, Channel channel) {
       System.out.println("【日志】消费端接收到消息:" + dataString);
  }
}

说明@RabbitListener 注解通过 QueueBinding 配置绑定关系,durable = "true" 确保队列持久化,防止消息丢失。

2.1.3 实现消息发送

在生产者模块中,使用 RabbitTemplate 发送消息:

package com.sangui.bootproducer;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-11
*/
@SpringBootTest
class BootProducerApplicationTests {
   public static final String EXCHANGE_DIRECT = "exchange.direct.order";
   public static final String ROUTING_KEY = "order";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendMessage() {
       rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello World");
  }
}

运行生产者与消费者程序,消费者将接收到消息并输出:

【日志】消费端接收到消息:Hello World

2.2 消息可靠性投递

消息可靠性投递(Message Reliability Delivery)是消息队列系统的核心需求,尤其在电商场景(如用户下单)中,确保消息不丢失、不重复、不错误至关重要。以下分析可能导致消息丢失的三种场景及其解决方案。

2.2.1 消息未到达消息队列(生产者端问题)

可能原因:生产者发送的消息未到达交换机或队列(如交换机或路由键错误)。

解决方案

  1. 生产者确认机制

    创建模块 module04-confirm-producer,配置依赖与消费者模块一致,添加生产者确认相关配置:

    spring:
    rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: 123456
      virtual-host: /
      publisher-confirm-type: correlated
      publisher-returns: true

    配置生产者确认回调:

    package com.sangui.confirmproducer.config;

    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Configuration;
    import jakarta.annotation.PostConstruct;
    import jakarta.annotation.Resource;
    import org.springframework.amqp.core.ReturnedMessage;

    /**
    * RabbitMQ 配置类
    * @author sangui
    * @date 2025-10-11
    */
    @Configuration
    public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
       @Resource
       private RabbitTemplate rabbitTemplate;

       @PostConstruct
       public void init() {
           rabbitTemplate.setConfirmCallback(this);
           rabbitTemplate.setReturnsCallback(this);
      }

       @Override
       public void confirm(CorrelationData correlationData, boolean ack, String cause) {
           System.out.println("【日志】confirm() correlationData: " + correlationData);
           System.out.println("【日志】confirm() ack: " + ack);
           System.out.println("【日志】confirm() cause: " + cause);
           if (ack) {
               System.out.println("【日志】消息成功发送到交换机,数据:" + correlationData);
          } else {
               System.out.println("【日志】消息发送到交换机失败,数据:" + correlationData + ",原因:" + cause);
          }
      }

       @Override
       public void returnedMessage(ReturnedMessage returned) {
           System.out.println("【日志】returnedMessage 消息主体:" + new String(returned.getMessage().getBody()));
           System.out.println("【日志】returnedMessage 应答码:" + returned.getReplyCode());
           System.out.println("【日志】returnedMessage 应答描述:" + returned.getReplyText());
           System.out.println("【日志】returnedMessage 交换机:" + returned.getExchange());
           System.out.println("【日志】returnedMessage 路由键:" + returned.getRoutingKey());
      }
    }

    测试类与 2.1.3 节类似,修改为发送消息 "Hello ConfirmProducer",并测试以下场景:

    • 交换机与路由键正确:消息成功发送到队列。

    • 交换机正确,路由键错误:消息到达交换机但未到达队列,触发 returnedMessage

    • 交换机错误:消息未到达交换机,触发 confirm 回调,ack = false

  2. 备份交换机

    配置备份交换机以处理路由键错误的情况:

    • 创建备份交换机 exchange.test.backup(类型:fanout)。

    • 创建备份队列 queue.test.backup,并绑定到备份交换机(无需路由键)。

    • 删除并重建目标交换机 exchange.direct.order,设置参数 alternate-exchange=exchange.test.backup

    • 将目标交换机绑定到原队列 queue.order,路由键为 order

    测试代码复用 2.1.3 节,发送消息 "测试备用交换机",验证当路由键错误时,消息被投递到备份队列。

2.2.2 消息队列服务器宕机(服务器端问题)

可能原因:RabbitMQ 服务器重启导致内存中消息丢失。

解决方案:启用消息持久化。

RabbitMQ 默认支持队列和消息持久化(durable = true)。通过重启 RabbitMQ 服务器(docker restart rabbitmq)验证,消息在重启后仍保留,证明持久化有效。

2.2.3 消费端异常或宕机(消费者端问题)

可能原因:消费端处理消息失败或宕机,导致消息未被正确消费。

解决方案:配置手动确认机制,支持消息重试。

创建模块 module05-confirm-consumer,配置如下:

spring:
rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: 123456
  virtual-host: /
  listener:
    simple:
      acknowledge-mode: manual

实现手动确认监听器:

package com.sangui.confirmconsumer.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-12
*/
@Component
public class MyMessageListener {
   public static final String EXCHANGE_DIRECT = "exchange.direct.order";
   public static final String ROUTING_KEY = "order";
   public static final String QUEUE_NAME = "queue.order";

   @RabbitListener(bindings = @QueueBinding(
           value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
           exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
           key = {ROUTING_KEY}
  ))
   public void processMessage(String dataString, Message message, Channel channel) throws IOException {
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       try {
           System.out.println("【日志】消费端接收到消息:" + dataString);
           // 模拟异常
           // System.out.println(10 / 0);
           channel.basicAck(deliveryTag, false);
      } catch (Exception e) {
           Boolean redelivered = message.getMessageProperties().getRedelivered();
           if (!redelivered) {
               channel.basicNack(deliveryTag, false, true);
          } else {
               channel.basicReject(deliveryTag, false);
          }
      }
  }
}

处理逻辑

  1. 正常处理:消费端成功处理消息,返回 ACK,消息从队列移除。

  2. 异常处理:若首次投递失败,返回 NACK 并重新入队;若再次失败,返回 Reject,消息不再入队。

  3. 幂等性要求:消费端需支持幂等性以避免重复处理。

测试通过 RabbitMQ 管理界面发送消息:

  • 正常消息:发送 "Test Consumer Confirm ACK",消费者成功接收并返回 ACK。

  • 异常消息:发送 "Test Consumer Confirm NACK",启用异常代码(10 / 0),消息重试一次后被拒绝。

2.3 消费端限流

消费端限流通过设置 prefetch 参数限制每次从队列获取的消息数量,防止消费端因处理能力不足而宕机。

2.3.1 配置限流

创建模块 module06-prefetch,配置如下:

spring:
rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: 123456
  virtual-host: /
  listener:
    simple:
      acknowledge-mode: manual
      prefetch: 1

2.3.2 生产者发送消息

package com.sangui.prefetch;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-12
*/
@SpringBootTest
class PrefetchApplicationTests {
   public static final String EXCHANGE_DIRECT = "exchange.direct.order";
   public static final String ROUTING_KEY = "order";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendMessage() {
       for (int i = 0; i < 100; i++) {
           rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello World" + i);
      }
  }
}

2.3.3 消费者处理消息

package com.sangui.prefetch.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-12
*/
@Component
public class MyMessageListener {
   public static final String EXCHANGE_DIRECT = "exchange.direct.order";
   public static final String ROUTING_KEY = "order";
   public static final String QUEUE_NAME = "queue.order";

   @RabbitListener(queues = {QUEUE_NAME})
   public void processMessage(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
       long deliveryTag = message.getMessageProperties().getDeliveryTag();
       System.out.println("【日志】消费端接收到消息:" + dataString);
       TimeUnit.SECONDS.sleep(1);
       channel.basicAck(deliveryTag, false);
  }
}

测试结果

  • 未设置 prefetch:消费者一次性获取 100 条消息,Unacked 从 100 逐步减少。

  • 设置 prefetch=1:消费者每次获取 1 条消息,Ready 逐步减少,Unacked 保持为 1,有效控制并发。

2.4 消息超时

消息超时用于处理长时间未被消费的消息,通过设置过期时间(TTL)自动移除消息,释放资源。

2.4.1 队列层面超时

在 RabbitMQ 管理界面配置:

  • 创建交换机 exchange.test.timeout(类型:Direct)。

  • 创建队列 queue.test.timeout,设置参数 x-message-ttl=5000(5秒)。

  • 绑定交换机与队列,路由键为 routing.key.test.timeout

发送消息:

package com.sangui.prefetch;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-12
*/
@SpringBootTest
class PrefetchApplicationTests {
   public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
   public static final String ROUTING_TIMEOUT = "routing.key.test.timeout";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendMessageForTimeout() {
       for (int i = 0; i < 100; i++) {
           rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_TIMEOUT, "Hello World" + i);
      }
  }
}

测试结果:运行生产者后,队列在 5 秒后清空消息。

2.4.2 消息层面超时

修改发送代码,设置消息级别的 TTL:

@Test
public void testSendMessageForTimeout2() {
   MessagePostProcessor postProcessor = message -> {
       message.getMessageProperties().setExpiration("2500");
       return message;
  };
   for (int i = 0; i < 100; i++) {
       rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_TIMEOUT, "Hello World" + i, postProcessor);
  }
}

测试结果:消息在 2.5 秒后被移除,优先于队列级别的 5 秒 TTL。

说明:当队列和消息均设置 TTL 时,以较短时间为准。

2.5 死信队列

死信队列用于处理无法正常消费的消息(如拒绝、溢出、超时)。常见场景包括订单超时未支付后自动取消。

2.5.1 配置死信队列

在 RabbitMQ 管理界面配置:

  • 死信交换机exchange.dead.letter.video

  • 死信队列queue.dead.letter.video

  • 死信路由键routing.key.dead.letter.video

  • 正常交换机exchange.normal.video

  • 正常队列queue.normal.video,设置参数:

    • x-dead-letter-exchange=exchange.dead.letter.video

    • x-dead-letter-routing-key=routing.key.dead.letter.video

    • x-max-length=10

    • x-message-ttl=10000

  • 正常路由键routing.key.normal.video

2.5.2 测试消息拒绝

创建模块 module07-dead-letter,配置与 2.3.1 节一致。

监听器代码:

package com.sangui.deadletter.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
@Component
public class MyMessageListener {
   public static final String EXCHANGE_NORMAL = "exchange.normal.video";
   public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
   public static final String QUEUE_NORMAL = "queue.normal.video";
   public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

   @RabbitListener(queues = {QUEUE_NORMAL})
   public void processMessageNormal(Message message, Channel channel) throws IOException {
       System.out.println("【日志】正常队列监听:收到消息但拒绝处理");
       channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  }

   @RabbitListener(queues = {QUEUE_DEAD_LETTER})
   public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
       System.out.println("【日志】【死信队列】收到死信消息:" + dataString);
       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  }
}

通过管理界面发送消息 "测试死信情况1:消息被拒绝",输出如下:

【日志】正常队列监听:收到消息但拒绝处理
【日志】【死信队列】收到死信消息:测试死信情况1:消息被拒绝

2.5.3 测试消息溢出与超时

发送 20 条消息,超出队列最大容量(10 条):

package com.sangui.deadletter;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
@SpringBootTest
class DeadLetterApplicationTests {
   public static final String EXCHANGE_NORMAL = "exchange.normal.video";
   public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendMultiMessage() {
       for (int i = 0; i < 20; i++) {
           rabbitTemplate.convertAndSend(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况2:消息数量超限" + i);
      }
  }
}

测试结果

  • 前 10 秒:正常队列保留 10 条消息,超出的 10 条进入死信队列。

  • 10 秒后:正常队列消息因超时进入死信队列,总计 20 条死信消息。

运行监听器,输出 20 条死信消息的日志。

2.6 延迟队列

延迟队列用于处理需延迟执行的任务,如电商订单超时未支付自动取消。

2.6.1 使用死信队列实现延迟

复用 2.5 节的死信队列配置,通过设置 TTL 实现延迟效果。

2.6.2 使用延迟消息插件

安装 rabbitmq-delayed-message-exchange 插件(支持最长 2 天延迟):

  1. 安装插件

    下载插件并放置到 Docker 映射目录 /var/lib/docker/volumes/rabbitmq-plugin/_data,执行以下命令启用:

    docker exec -it rabbitmq /bin/bash
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    exit
    docker restart rabbitmq
  2. 配置延迟队列

    • 创建交换机 exchange.test.delay(类型:x-delayed-message,参数 x-delayed-type=direct)。

    • 创建队列 queue.test.delay

    • 绑定交换机与队列,路由键为 routing.key.test.delay

  3. 生产者代码

package com.sangui.delayedmessage;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
@SpringBootTest
class DelayedMessageApplicationTests {
   public static final String EXCHANGE_DELAY = "exchange.test.delay";
   public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendDelayMessage() {
       rabbitTemplate.convertAndSend(
               EXCHANGE_DELAY,
               ROUTING_KEY_DELAY,
               "测试延迟消息 [" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "]",
               message -> {
                   message.getMessageProperties().setHeader("x-delay", 10000);
                   return message;
              });
  }
}
  1. 消费者代码

package com.sangui.delayedmessage.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
@Component
public class MyMessageListener {
   public static final String QUEUE_DELAY = "queue.test.delay";

   @RabbitListener(queues = {QUEUE_DELAY})
   public void processMessage(String dataString, Message message, Channel channel) throws IOException {
       System.out.println("【日志】接收到延迟消息:" + dataString);
       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  }
}

测试结果:消息延迟 10 秒后被消费者接收,输出类似:

【日志】接收到延迟消息:测试延迟消息 [15:38:23]

2.7 事务消息

事务消息旨在确保一组消息要么全部发送成功,要么全部失败,但 RabbitMQ 的事务机制在分布式系统中功能有限。

2.7.1 配置事务

创建模块 module09-tx,配置如下:

spring:
rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: 123456
  virtual-host: /
  listener:
    simple:
      acknowledge-mode: manual

配置事务管理器:

package com.sangui.tx.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 配置类
* @author sangui
* @date 2025-10-13
*/
@Configuration
public class RabbitConfig {
   @Bean
   public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
       return new RabbitTransactionManager(connectionFactory);
  }

   @Bean
   public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
       RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       rabbitTemplate.setChannelTransacted(true);
       return rabbitTemplate;
  }
}

2.7.2 测试事务

package com.sangui.tx;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.annotation.Transactional;

@SpringBootTest
class TxApplicationTests {
   public static final String EXCHANGE_NAME = "exchange.tx.dragon";
   public static final String ROUTING_KEY = "routing.key.tx.dragon";

   @Resource
   private RabbitTemplate rabbitTemplate;

   @Test
   @Transactional
   public void testSendMessageInTx() {
       rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
       System.out.println("手动抛出异常:" + 10 / 0);
       rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
  }
}

测试结果:启用 @Transactional 后,异常导致事务回滚,两条消息均未发送。

注意:在 JUnit 测试中,需添加 @Rollback(value = false) 注解以提交事务,否则默认回滚。

2.8 优先级队列

优先级队列通过设置消息优先级(x-max-priority)实现高优先级消息优先投递。

2.8.1 配置优先级队列

在 RabbitMQ 管理界面配置:

  • 交换机:exchange.test.priority

  • 队列:queue.test.priority,设置参数 x-max-priority=10

  • 路由键:routing.key.test.priority

2.8.2 发送优先级消息

创建模块 module10-priority,配置与 2.7.1 节一致。

生产者代码:

package com.sangui.priority;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
@SpringBootTest
class PriorityApplicationTests {
   public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
   public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Test
   public void testSendMessage1() {
       rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 1 的消息", message -> {
           message.getMessageProperties().setPriority(1);
           return message;
      });
  }

   @Test
   public void testSendMessage2() {
       rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 2 的消息", message -> {
           message.getMessageProperties().setPriority(2);
           return message;
      });
  }

   @Test
   public void testSendMessage3() {
       rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 3 的消息", message -> {
           message.getMessageProperties().setPriority(3);
           return message;
      });
  }
}

2.8.3 接收优先级消息

package com.sangui.priority.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
@Component
public class MyMessageListener {
   public static final String QUEUE_PRIORITY = "queue.test.priority";

   @RabbitListener(queues = {QUEUE_PRIORITY})
   public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
       System.out.println("【日志】接收的消息是:" + data);
       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  }
}

测试结果:消费者按优先级顺序接收消息(优先级 3 > 2 > 1),而不是发送顺序。

3 总结

通过以上实践,我们实现了 SpringBoot 与 RabbitMQ 的集成,涵盖了消息收发、可靠性投递、限流、超时、死信队列、延迟队列、事务消息和优先级队列等功能。这些功能为构建高可靠、高性能的消息系统提供了坚实基础。开发者可根据实际业务场景选择合适的机制,确保消息传递的稳定性和效率。



  • 微信
  • 赶快加我聊天吧
  • QQ
  • 赶快加我聊天吧
  • weinxin
三桂

发表评论 取消回复 您未登录,登录后才能评论,前往登录

    • avatar 三桂 博主

      RabbitMQ 的完整笔记可在 https://github.com/WuSangui571/rabbitmq 中的 README.md 文件浏览~