RabbitMQ 笔记——SpringBoot 集成 RabbitMQ 的应用与消息可靠性等实践(进阶篇)
本文记录了在 SpringBoot 项目中集成 RabbitMQ 的进阶功能实现,涵盖消息发送与接收、消息可靠性投递、消费端限流、消息超时、死信队列、延迟队列、事务消息以及优先级队列的配置与应用。文章旨在为开发者提供清晰的实践指南,帮助理解 RabbitMQ 在实际项目中的高级用法及消息可靠性保障机制。
2 SpringBoot 与 RabbitMQ 集成
2.1 环境搭建与基本消息收发
本节介绍如何在 SpringBoot 项目中搭建 RabbitMQ 的生产者与消费者模块,并实现基本的消息发送与接收功能。
2.1.1 搭建生产者与消费者模块
消费者模块配置
创建模块
module02-boot-consumer,添加以下依赖:<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>配置
application.yml文件,连接本地 RabbitMQ 服务:spring
rabbitmq
host127.0.0.1
port5672
usernameguest
password123456
virtual-host/生产者模块配置
创建模块
module03-boot-producer,添加以下依赖:<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>配置与消费者模块一致的
application.yml文件。
2.1.2 实现消息接收
在消费者模块中,使用 @RabbitListener 注解创建自定义监听器,接收消息:
package com.sangui.bootconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-11
*/
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
(bindings = (
value = (value = QUEUE_NAME, durable = "true"),
exchange = (value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {
System.out.println("【日志】消费端接收到消息:" + dataString);
}
}说明:
@RabbitListener注解通过QueueBinding配置绑定关系,durable = "true"确保队列持久化,防止消息丢失。
2.1.3 实现消息发送
在生产者模块中,使用 RabbitTemplate 发送消息:
package com.sangui.bootproducer;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-11
*/
class BootProducerApplicationTests {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
private RabbitTemplate rabbitTemplate;
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello World");
}
}运行生产者与消费者程序,消费者将接收到消息并输出:
【日志】消费端接收到消息:Hello World2.2 消息可靠性投递
消息可靠性投递(Message Reliability Delivery)是消息队列系统的核心需求,尤其在电商场景(如用户下单)中,确保消息不丢失、不重复、不错误至关重要。以下分析可能导致消息丢失的三种场景及其解决方案。
2.2.1 消息未到达消息队列(生产者端问题)
可能原因:生产者发送的消息未到达交换机或队列(如交换机或路由键错误)。
解决方案:
生产者确认机制
创建模块
module04-confirm-producer,配置依赖与消费者模块一致,添加生产者确认相关配置:spring
rabbitmq
host127.0.0.1
port5672
usernameguest
password123456
virtual-host/
publisher-confirm-typecorrelated
publisher-returnstrue配置生产者确认回调:
package com.sangui.confirmproducer.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.ReturnedMessage;
/**
* RabbitMQ 配置类
* @author sangui
* @date 2025-10-11
*/
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
private RabbitTemplate rabbitTemplate;
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("【日志】confirm() correlationData: " + correlationData);
System.out.println("【日志】confirm() ack: " + ack);
System.out.println("【日志】confirm() cause: " + cause);
if (ack) {
System.out.println("【日志】消息成功发送到交换机,数据:" + correlationData);
} else {
System.out.println("【日志】消息发送到交换机失败,数据:" + correlationData + ",原因:" + cause);
}
}
public void returnedMessage(ReturnedMessage returned) {
System.out.println("【日志】returnedMessage 消息主体:" + new String(returned.getMessage().getBody()));
System.out.println("【日志】returnedMessage 应答码:" + returned.getReplyCode());
System.out.println("【日志】returnedMessage 应答描述:" + returned.getReplyText());
System.out.println("【日志】returnedMessage 交换机:" + returned.getExchange());
System.out.println("【日志】returnedMessage 路由键:" + returned.getRoutingKey());
}
}测试类与 2.1.3 节类似,修改为发送消息
"Hello ConfirmProducer",并测试以下场景:交换机与路由键正确:消息成功发送到队列。
交换机正确,路由键错误:消息到达交换机但未到达队列,触发
returnedMessage。交换机错误:消息未到达交换机,触发
confirm回调,ack = false。
备份交换机
配置备份交换机以处理路由键错误的情况:
创建备份交换机
exchange.test.backup(类型:fanout)。创建备份队列
queue.test.backup,并绑定到备份交换机(无需路由键)。删除并重建目标交换机
exchange.direct.order,设置参数alternate-exchange=exchange.test.backup。将目标交换机绑定到原队列
queue.order,路由键为order。
测试代码复用 2.1.3 节,发送消息
"测试备用交换机",验证当路由键错误时,消息被投递到备份队列。
2.2.2 消息队列服务器宕机(服务器端问题)
可能原因:RabbitMQ 服务器重启导致内存中消息丢失。
解决方案:启用消息持久化。
RabbitMQ 默认支持队列和消息持久化(durable = true)。通过重启 RabbitMQ 服务器(docker restart rabbitmq)验证,消息在重启后仍保留,证明持久化有效。
2.2.3 消费端异常或宕机(消费者端问题)
可能原因:消费端处理消息失败或宕机,导致消息未被正确消费。
解决方案:配置手动确认机制,支持消息重试。
创建模块 module05-confirm-consumer,配置如下:
spring
rabbitmq
host127.0.0.1
port5672
usernameguest
password123456
virtual-host/
listener
simple
acknowledge-modemanual实现手动确认监听器:
package com.sangui.confirmconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-12
*/
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
(bindings = (
value = (value = QUEUE_NAME, durable = "true", autoDelete = "false"),
exchange = (value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("【日志】消费端接收到消息:" + dataString);
// 模拟异常
// System.out.println(10 / 0);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (!redelivered) {
channel.basicNack(deliveryTag, false, true);
} else {
channel.basicReject(deliveryTag, false);
}
}
}
}处理逻辑:
正常处理:消费端成功处理消息,返回 ACK,消息从队列移除。
异常处理:若首次投递失败,返回 NACK 并重新入队;若再次失败,返回 Reject,消息不再入队。
幂等性要求:消费端需支持幂等性以避免重复处理。
测试通过 RabbitMQ 管理界面发送消息:
正常消息:发送
"Test Consumer Confirm ACK",消费者成功接收并返回 ACK。异常消息:发送
"Test Consumer Confirm NACK",启用异常代码(10 / 0),消息重试一次后被拒绝。
2.3 消费端限流
消费端限流通过设置 prefetch 参数限制每次从队列获取的消息数量,防止消费端因处理能力不足而宕机。
2.3.1 配置限流
创建模块 module06-prefetch,配置如下:
spring
rabbitmq
host127.0.0.1
port5672
usernameguest
password123456
virtual-host/
listener
simple
acknowledge-modemanual
prefetch12.3.2 生产者发送消息
package com.sangui.prefetch;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-12
*/
class PrefetchApplicationTests {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
private RabbitTemplate rabbitTemplate;
public void testSendMessage() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY, "Hello World" + i);
}
}
}2.3.3 消费者处理消息
package com.sangui.prefetch.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-12
*/
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("【日志】消费端接收到消息:" + dataString);
TimeUnit.SECONDS.sleep(1);
channel.basicAck(deliveryTag, false);
}
}测试结果:
未设置
prefetch:消费者一次性获取 100 条消息,Unacked从 100 逐步减少。设置
prefetch=1:消费者每次获取 1 条消息,Ready逐步减少,Unacked保持为 1,有效控制并发。
2.4 消息超时
消息超时用于处理长时间未被消费的消息,通过设置过期时间(TTL)自动移除消息,释放资源。
2.4.1 队列层面超时
在 RabbitMQ 管理界面配置:
创建交换机
exchange.test.timeout(类型:Direct)。创建队列
queue.test.timeout,设置参数x-message-ttl=5000(5秒)。绑定交换机与队列,路由键为
routing.key.test.timeout。
发送消息:
package com.sangui.prefetch;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-12
*/
class PrefetchApplicationTests {
public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
public static final String ROUTING_TIMEOUT = "routing.key.test.timeout";
private RabbitTemplate rabbitTemplate;
public void testSendMessageForTimeout() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_TIMEOUT, "Hello World" + i);
}
}
}测试结果:运行生产者后,队列在 5 秒后清空消息。
2.4.2 消息层面超时
修改发送代码,设置消息级别的 TTL:
public void testSendMessageForTimeout2() {
MessagePostProcessor postProcessor = message -> {
message.getMessageProperties().setExpiration("2500");
return message;
};
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_TIMEOUT, "Hello World" + i, postProcessor);
}
}测试结果:消息在 2.5 秒后被移除,优先于队列级别的 5 秒 TTL。
说明:当队列和消息均设置 TTL 时,以较短时间为准。
2.5 死信队列
死信队列用于处理无法正常消费的消息(如拒绝、溢出、超时)。常见场景包括订单超时未支付后自动取消。
2.5.1 配置死信队列
在 RabbitMQ 管理界面配置:
死信交换机:
exchange.dead.letter.video死信队列:
queue.dead.letter.video死信路由键:
routing.key.dead.letter.video正常交换机:
exchange.normal.video正常队列:
queue.normal.video,设置参数:x-dead-letter-exchange=exchange.dead.letter.videox-dead-letter-routing-key=routing.key.dead.letter.videox-max-length=10x-message-ttl=10000
正常路由键:
routing.key.normal.video
2.5.2 测试消息拒绝
创建模块 module07-dead-letter,配置与 2.3.1 节一致。
监听器代码:
package com.sangui.deadletter.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
public class MyMessageListener {
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
System.out.println("【日志】正常队列监听:收到消息但拒绝处理");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
System.out.println("【日志】【死信队列】收到死信消息:" + dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}通过管理界面发送消息 "测试死信情况1:消息被拒绝",输出如下:
【日志】正常队列监听:收到消息但拒绝处理
【日志】【死信队列】收到死信消息:测试死信情况1:消息被拒绝
2.5.3 测试消息溢出与超时
发送 20 条消息,超出队列最大容量(10 条):
package com.sangui.deadletter;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
class DeadLetterApplicationTests {
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
private RabbitTemplate rabbitTemplate;
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况2:消息数量超限" + i);
}
}
}测试结果:
前 10 秒:正常队列保留 10 条消息,超出的 10 条进入死信队列。
10 秒后:正常队列消息因超时进入死信队列,总计 20 条死信消息。
运行监听器,输出 20 条死信消息的日志。
2.6 延迟队列
延迟队列用于处理需延迟执行的任务,如电商订单超时未支付自动取消。
2.6.1 使用死信队列实现延迟
复用 2.5 节的死信队列配置,通过设置 TTL 实现延迟效果。
2.6.2 使用延迟消息插件
安装 rabbitmq-delayed-message-exchange 插件(支持最长 2 天延迟):
安装插件
下载插件并放置到 Docker 映射目录
/var/lib/docker/volumes/rabbitmq-plugin/_data,执行以下命令启用:docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq配置延迟队列
创建交换机
exchange.test.delay(类型:x-delayed-message,参数x-delayed-type=direct)。创建队列
queue.test.delay。绑定交换机与队列,路由键为
routing.key.test.delay。
生产者代码
package com.sangui.delayedmessage;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
class DelayedMessageApplicationTests {
public static final String EXCHANGE_DELAY = "exchange.test.delay";
public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";
private RabbitTemplate rabbitTemplate;
public void testSendDelayMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY,
ROUTING_KEY_DELAY,
"测试延迟消息 [" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "]",
message -> {
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
});
}
}消费者代码
package com.sangui.delayedmessage.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
public class MyMessageListener {
public static final String QUEUE_DELAY = "queue.test.delay";
(queues = {QUEUE_DELAY})
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
System.out.println("【日志】接收到延迟消息:" + dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}测试结果:消息延迟 10 秒后被消费者接收,输出类似:
【日志】接收到延迟消息:测试延迟消息 [15:38:23]2.7 事务消息
事务消息旨在确保一组消息要么全部发送成功,要么全部失败,但 RabbitMQ 的事务机制在分布式系统中功能有限。
2.7.1 配置事务
创建模块 module09-tx,配置如下:
spring
rabbitmq
host127.0.0.1
port5672
usernameguest
password123456
virtual-host/
listener
simple
acknowledge-modemanual配置事务管理器:
package com.sangui.tx.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
* @author sangui
* @date 2025-10-13
*/
public class RabbitConfig {
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}2.7.2 测试事务
package com.sangui.tx;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.annotation.Transactional;
class TxApplicationTests {
public static final String EXCHANGE_NAME = "exchange.tx.dragon";
public static final String ROUTING_KEY = "routing.key.tx.dragon";
private RabbitTemplate rabbitTemplate;
public void testSendMessageInTx() {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
System.out.println("手动抛出异常:" + 10 / 0);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
}
}测试结果:启用 @Transactional 后,异常导致事务回滚,两条消息均未发送。
注意:在 JUnit 测试中,需添加
@Rollback(value = false)注解以提交事务,否则默认回滚。
2.8 优先级队列
优先级队列通过设置消息优先级(x-max-priority)实现高优先级消息优先投递。
2.8.1 配置优先级队列
在 RabbitMQ 管理界面配置:
交换机:
exchange.test.priority队列:
queue.test.priority,设置参数x-max-priority=10路由键:
routing.key.test.priority
2.8.2 发送优先级消息
创建模块 module10-priority,配置与 2.7.1 节一致。
生产者代码:
package com.sangui.priority;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 生产者测试类
* @author sangui
* @date 2025-10-13
*/
class PriorityApplicationTests {
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
private RabbitTemplate rabbitTemplate;
public void testSendMessage1() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 1 的消息", message -> {
message.getMessageProperties().setPriority(1);
return message;
});
}
public void testSendMessage2() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 2 的消息", message -> {
message.getMessageProperties().setPriority(2);
return message;
});
}
public void testSendMessage3() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "优先级 3 的消息", message -> {
message.getMessageProperties().setPriority(3);
return message;
});
}
}2.8.3 接收优先级消息
package com.sangui.priority.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 自定义消息监听器
* @author sangui
* @date 2025-10-13
*/
public class MyMessageListener {
public static final String QUEUE_PRIORITY = "queue.test.priority";
(queues = {QUEUE_PRIORITY})
public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
System.out.println("【日志】接收的消息是:" + data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}测试结果:消费者按优先级顺序接收消息(优先级 3 > 2 > 1),而不是发送顺序。
3 总结
通过以上实践,我们实现了 SpringBoot 与 RabbitMQ 的集成,涵盖了消息收发、可靠性投递、限流、超时、死信队列、延迟队列、事务消息和优先级队列等功能。这些功能为构建高可靠、高性能的消息系统提供了坚实基础。开发者可根据实际业务场景选择合适的机制,确保消息传递的稳定性和效率。
- 微信
- 赶快加我聊天吧

- 赶快加我聊天吧

2025年10月13日 17:46:36 1楼
RabbitMQ 的完整笔记可在 https://github.com/WuSangui571/rabbitmq 中的 README.md 文件浏览~