RabbitMQ 笔记——从概念到工作模式实践(基础篇)
消息队列(Message Queue, MQ)是一种应用程序之间的异步通信中间件,用于实现解耦、异步处理和流量削峰等功能。通过引入消息队列,可以将同步操作转化为异步操作,提高系统的灵活性和稳定性。
消息队列的核心作用
以快递场景为例:快递员送达快递时,若收件人正在开会无法即时接收,快递员需等待。这类似于微服务中 A 服务通过 OpenFeign 同步调用 B 服务,若 B 服务未响应,A 服务将阻塞。引入“快递柜”(消息队列)后,快递员将包裹放入柜中即可离开,收件人后续自行取出。
在微服务架构中,A 服务将消息放入队列后即可继续执行其他任务,B 服务从队列消费消息并处理。两者在业务上关联,但执行时异步独立,无需相互等待。这使得架构设计更灵活,支持同步与异步操作的结合。
具体到下单场景:
不使用消息队列:
用户发送下单请求
保存订单
更新购物车
更新库存
更新积分
操作完成
返回响应
使用消息队列:
用户发送下单请求
保存订单
存入消息队列
操作完成
返回响应
后续操作(如更新购物车、库存、积分)由队列异步处理,与用户响应无关,可在系统内部逐步执行。
此外,在流量波动大的场景中,消息队列可将高峰期任务转移到低谷期处理,实现资源利用最大化和系统稳定,以时间换空间。
不使用消息队列的问题
功能耦合度高:下单步骤串行执行,任一环节失败(如库存更新出错)均导致整体失败,影响用户体验。
响应时间长:各步骤串行,总时延为各步骤之和。
并发压力传递:前端并发压力直接传导至后端,所有模块需承受相同负载。
系统弹性不足:新增功能(如数据统计)需修改原有代码,违背开闭原则(OCP),维护成本高。
使用消息队列的优势
功能解耦:核心操作(如生成订单)完成后即可响应,用户体验佳;后续错误在内部处理,不影响前端。
快速响应:仅执行保存订单和入队列,显著缩短时延。
削峰限流:队列缓冲并发,前后模块负载可独立调整(如通过参数控制消费速率),后端操作可缓慢执行。
便于扩展:新增功能仅需订阅队列,无需修改原有代码,符合 OCP 原则。
适用场景与注意事项
消息队列适用于弱关联、可拆分的功能异步处理,而强相关逻辑仍需同步调用。并非所有同步操作均需转为异步,大多数核心业务仍以同步为主。
主流消息队列产品及协议
消息队列通信协议标准包括:
AMQP(Advanced Message Queuing Protocol):通用协议,由 IBM 开发,支持复杂路由。
JMS(Java Message Service):Java 专用标准,由 Sun 开发,一组 Java 接口。
主流 MQ 产品:
RabbitMQ:Erlang 编写,支持 AMQP,灵活性强。
ActiveMQ:Apache 项目,Java 实现,支持 JMS。
RocketMQ:阿里开源,Java 实现,高性能分布式。
Kafka:Apache 项目,Scala & Java 实现,擅长大数据流处理。
本文重点介绍 RabbitMQ。
RabbitMQ 体系结构
RabbitMQ 实例称为 Broker,可划分多个 Virtual Host(虚拟主机,分组隔离)。生产者(Producer)发送消息,消费者(Consumer)接收。生产者和消费者(如 Spring Cloud 微服务)通过 Connection(TCP 连接)通信,为高效复用,引入 Channel(通道)。
核心组件介绍
Exchange(交换机):消息抵达 Broker 的第一站,根据类型路由到 Queue。不存储消息,仅中转。类型包括 Fanout(广播)、Direct(定向)、Topic(通配符)。每个 Virtual Host 可含多个 Exchange。
Queue(队列):消息存储容器,消息从 Exchange 路由至此,消费后删除。每个 Queue 可绑定多个 Exchange。
Binding(绑定):Exchange 与 Queue 的关联规则,包括 Routing Key。
注意:Exchange 不存储消息,若无绑定队列或路由不匹配,消息丢失。
RabbitMQ 安装与简单模式实践
Docker 安装步骤
拉取镜像:
docker pull rabbitmq:3.13-management运行容器:
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
验证:访问 http://localhost:15672,用户名 guest,密码 123456。
简单模式实现 Hello World
简单模式使用默认 Exchange,直接将消息发送到队列(不经显式交换机)。
添加依赖(Maven):
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>生产者代码:
package com.sangui.helloworld.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
String message = "你好呀对面!";
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}效果:控制台输出“已发送消息:你好呀对面!”;管理界面显示 simple_queue 有 1 条消息。
消费者代码:
package com.sangui.helloworld.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("simple_queue", true, consumer);
}
}效果:控制台输出消息内容;管理界面消息数变为 0(已被消费)。
此模式类似于快递柜:生产者放入消息后离场,消费者随时消费。实际开发多用框架(如 Spring AMQP),而非原生代码。
RabbitMQ 工作模式详解
RabbitMQ 官网教程列出 7 种模式,本文重点讲解简单模式(Hello World!)的扩展:Work Queues、Publish/Subscribe、Routing 和 Topics。
Work Queues(工作队列)
扩展简单模式:一个队列绑定多个消费者,消息竞争消费(轮询分发)。适用于任务重载场景,提高处理效率。
生产者工具类(ConnectionUtil):
package com.sangui.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String HOST_ADDRESS = "127.0.0.1";
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_ADDRESS);
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
return factory.newConnection();
}
}生产者:
package com.sangui.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sangui.util.ConnectionUtil;
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 10; i++) {
String body = i + "你好,rabbitmq,我来演示 work queues";
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
}
channel.close();
connection.close();
}
}效果:管理界面显示 10 条消息。
消费者(Consumer1 和 Consumer2 代码类似,仅打印差异):
package com.sangui.work;
import com.rabbitmq.client.*;
import com.sangui.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}运行:先启消费者,再启生产者。结果:消息奇偶分担,竞争消费。
Publish/Subscribe(发布订阅)
使用 Fanout Exchange 广播消息到多个队列,每个队列消费者独立接收(非竞争)。
生产者:
package com.sangui.fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sangui.util.ConnectionUtil;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
String body = "日志信息:张三调用了 findAll 方法...日志级别:info...";
channel.basicPublish(exchangeName, "", null, body.getBytes());
channel.close();
connection.close();
}
}消费者(Consumer1 和 Consumer2 类似,仅队列和打印差异):
package com.sangui.fanout;
import com.rabbitmq.client.*;
import com.sangui.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}效果:两者均接收消息。区别于 Work Queues:广播 vs 竞争;显式 Exchange vs 默认。
Routing(路由模式)
使用 Direct Exchange,根据 Routing Key 精确路由。
生产者:
package com.sangui.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sangui.util.ConnectionUtil;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
String message = "日志信息:张三调用了 delete 方法.错误了,日志级别 warning";
channel.basicPublish(exchangeName, "warning", null, message.getBytes());
channel.close();
connection.close();
}
}消费者(类似,仅队列差异):
package com.sangui.routing;
import com.rabbitmq.client.*;
import com.sangui.util.ConnectionUtil;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name, true, consumer);
}
}效果:仅匹配 warning 的队列 2 消费。支持定点或多播。
Topics(主题模式)
Topic Exchange 使用通配符 Routing Key:*(匹配一个词)、#(匹配零或多个词)。Routing Key 格式如 item.insert。
生产者:
package com.sangui.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sangui.util.ConnectionUtil;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
String body1 = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName, "order.info", null, body1.getBytes());
String body2 = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName, "goods.info", null, body2.getBytes());
String body3 = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName, "goods.error", null, body3.getBytes());
channel.close();
connection.close();
}
}消费者(类似,仅队列差异):
package com.sangui.topic;
import com.rabbitmq.client.*;
import com.sangui.util.ConnectionUtil;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queueName = "test_topic_queue1";
channel.queueDeclare(queueName, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(queueName, true, consumer);
}
}效果:队列 1 接收 order.info 和 goods.error;队列 2 接收全部。灵活匹配动态路由。
其他模式简述
RPC:同步调用,非典型异步,不展开。
Publisher Confirms:发送确认,后续可靠性章节详述。
工作模式小结
直接到队列:默认 Exchange,简单/Work Queues。
经 Exchange:
Fanout:广播,无 Routing Key。
Direct:精确 Routing Key,定点/多播。
Topic:通配符 Routing Key。
- 微信
- 赶快加我聊天吧

- 赶快加我聊天吧
