RabbitMQ 笔记——从概念到工作模式实践(基础篇)

消息队列简介

消息队列(Message Queue, MQ)是一种应用程序之间的异步通信中间件,用于实现解耦、异步处理和流量削峰等功能。通过引入消息队列,可以将同步操作转化为异步操作,提高系统的灵活性和稳定性。

消息队列的核心作用

以快递场景为例:快递员送达快递时,若收件人正在开会无法即时接收,快递员需等待。这类似于微服务中 A 服务通过 OpenFeign 同步调用 B 服务,若 B 服务未响应,A 服务将阻塞。引入“快递柜”(消息队列)后,快递员将包裹放入柜中即可离开,收件人后续自行取出。

在微服务架构中,A 服务将消息放入队列后即可继续执行其他任务,B 服务从队列消费消息并处理。两者在业务上关联,但执行时异步独立,无需相互等待。这使得架构设计更灵活,支持同步与异步操作的结合。

具体到下单场景:

  • 不使用消息队列:

    1. 用户发送下单请求

    2. 保存订单

    3. 更新购物车

    4. 更新库存

    5. 更新积分

    6. 操作完成

    7. 返回响应

  • 使用消息队列:

    1. 用户发送下单请求

    2. 保存订单

    3. 存入消息队列

    4. 操作完成

    5. 返回响应

后续操作(如更新购物车、库存、积分)由队列异步处理,与用户响应无关,可在系统内部逐步执行。

此外,在流量波动大的场景中,消息队列可将高峰期任务转移到低谷期处理,实现资源利用最大化和系统稳定,以时间换空间。

不使用消息队列的问题

  1. 功能耦合度高:下单步骤串行执行,任一环节失败(如库存更新出错)均导致整体失败,影响用户体验。

  2. 响应时间长:各步骤串行,总时延为各步骤之和。

  3. 并发压力传递:前端并发压力直接传导至后端,所有模块需承受相同负载。

  4. 系统弹性不足:新增功能(如数据统计)需修改原有代码,违背开闭原则(OCP),维护成本高。

使用消息队列的优势

  1. 功能解耦:核心操作(如生成订单)完成后即可响应,用户体验佳;后续错误在内部处理,不影响前端。

  2. 快速响应:仅执行保存订单和入队列,显著缩短时延。

  3. 削峰限流:队列缓冲并发,前后模块负载可独立调整(如通过参数控制消费速率),后端操作可缓慢执行。

  4. 便于扩展:新增功能仅需订阅队列,无需修改原有代码,符合 OCP 原则。

适用场景与注意事项

消息队列适用于弱关联、可拆分的功能异步处理,而强相关逻辑仍需同步调用。并非所有同步操作均需转为异步,大多数核心业务仍以同步为主。

主流消息队列产品及协议

消息队列通信协议标准包括:

  • AMQP(Advanced Message Queuing Protocol):通用协议,由 IBM 开发,支持复杂路由。

  • JMS(Java Message Service):Java 专用标准,由 Sun 开发,一组 Java 接口。

主流 MQ 产品:

  • RabbitMQ:Erlang 编写,支持 AMQP,灵活性强。

  • ActiveMQ:Apache 项目,Java 实现,支持 JMS。

  • RocketMQ:阿里开源,Java 实现,高性能分布式。

  • Kafka:Apache 项目,Scala & Java 实现,擅长大数据流处理。

本文重点介绍 RabbitMQ。

RabbitMQ 体系结构

RabbitMQ 实例称为 Broker,可划分多个 Virtual Host(虚拟主机,分组隔离)。生产者(Producer)发送消息,消费者(Consumer)接收。生产者和消费者(如 Spring Cloud 微服务)通过 Connection(TCP 连接)通信,为高效复用,引入 Channel(通道)。

核心组件介绍

  • Exchange(交换机):消息抵达 Broker 的第一站,根据类型路由到 Queue。不存储消息,仅中转。类型包括 Fanout(广播)、Direct(定向)、Topic(通配符)。每个 Virtual Host 可含多个 Exchange。

  • Queue(队列):消息存储容器,消息从 Exchange 路由至此,消费后删除。每个 Queue 可绑定多个 Exchange。

  • Binding(绑定):Exchange 与 Queue 的关联规则,包括 Routing Key。

注意:Exchange 不存储消息,若无绑定队列或路由不匹配,消息丢失。

RabbitMQ 安装与简单模式实践

Docker 安装步骤

  1. 拉取镜像:

    docker pull rabbitmq:3.13-management
  2. 运行容器:

    docker run -d \
    --name rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    -v rabbitmq-plugin:/plugins \
    -e RABBITMQ_DEFAULT_USER=guest \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    rabbitmq:3.13-management

验证:访问 http://localhost:15672,用户名 guest,密码 123456。

简单模式实现 Hello World

简单模式使用默认 Exchange,直接将消息发送到队列(不经显式交换机)。

  1. 添加依赖(Maven):

    <dependencies>
       <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>5.20.0</version>
       </dependency>
    </dependencies>
  2. 生产者代码:

    package com.sangui.helloworld.simple;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Producer {
       public static void main(String[] args) throws Exception {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost("127.0.0.1");
           factory.setPort(5672);
           factory.setVirtualHost("/");
           factory.setUsername("guest");
           factory.setPassword("123456");

           Connection connection = factory.newConnection();
           Channel channel = connection.createChannel();

           channel.queueDeclare("simple_queue", true, false, false, null);

           String message = "你好呀对面!";
           channel.basicPublish("", "simple_queue", null, message.getBytes());
           System.out.println("已发送消息:" + message);

           channel.close();
           connection.close();
      }
    }

    效果:控制台输出“已发送消息:你好呀对面!”;管理界面显示 simple_queue 有 1 条消息。

  3. 消费者代码:

    package com.sangui.helloworld.simple;

    import com.rabbitmq.client.*;

    import java.io.IOException;

    public class Consumer {
       public static void main(String[] args) throws Exception {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost("127.0.0.1");
           factory.setPort(5672);
           factory.setVirtualHost("/");
           factory.setUsername("guest");
           factory.setPassword("123456");

           Connection connection = factory.newConnection();
           Channel channel = connection.createChannel();

           channel.queueDeclare("simple_queue", true, false, false, null);

           DefaultConsumer consumer = new DefaultConsumer(channel) {
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("body:" + new String(body));
              }
          };

           channel.basicConsume("simple_queue", true, consumer);
      }
    }

    效果:控制台输出消息内容;管理界面消息数变为 0(已被消费)。

此模式类似于快递柜:生产者放入消息后离场,消费者随时消费。实际开发多用框架(如 Spring AMQP),而非原生代码。

RabbitMQ 工作模式详解

RabbitMQ 官网教程列出 7 种模式,本文重点讲解简单模式(Hello World!)的扩展:Work Queues、Publish/Subscribe、Routing 和 Topics。

Work Queues(工作队列)

扩展简单模式:一个队列绑定多个消费者,消息竞争消费(轮询分发)。适用于任务重载场景,提高处理效率。

  • 生产者工具类(ConnectionUtil):

    package com.sangui.util;

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class ConnectionUtil {
       public static final String HOST_ADDRESS = "127.0.0.1";

       public static Connection getConnection() throws Exception {
           ConnectionFactory factory = new ConnectionFactory();
           factory.setHost(HOST_ADDRESS);
           factory.setPort(5672);
           factory.setVirtualHost("/");
           factory.setUsername("guest");
           factory.setPassword("123456");
           return factory.newConnection();
      }
    }
  • 生产者:

    package com.sangui.work;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sangui.util.ConnectionUtil;

    public class Producer {
       public static final String QUEUE_NAME = "work_queue";

       public static void main(String[] args) throws Exception {
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
           channel.queueDeclare(QUEUE_NAME, true, false, false, null);

           for (int i = 1; i <= 10; i++) {
               String body = i + "你好,rabbitmq,我来演示 work queues";
               channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
          }

           channel.close();
           connection.close();
      }
    }

    效果:管理界面显示 10 条消息。

  • 消费者(Consumer1 和 Consumer2 代码类似,仅打印差异):

    package com.sangui.work;

    import com.rabbitmq.client.*;
    import com.sangui.util.ConnectionUtil;

    import java.io.IOException;

    public class Consumer1 {
       static final String QUEUE_NAME = "work_queue";

       public static void main(String[] args) throws Exception {
           Connection connection = ConnectionUtil.getConnection();
           Channel channel = connection.createChannel();
           channel.queueDeclare(QUEUE_NAME, true, false, false, null);

           Consumer consumer = new DefaultConsumer(channel) {
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("Consumer1 body:" + new String(body));
              }
          };
           channel.basicConsume(QUEUE_NAME, true, consumer);
      }
    }

    运行:先启消费者,再启生产者。结果:消息奇偶分担,竞争消费。

Publish/Subscribe(发布订阅)

使用 Fanout Exchange 广播消息到多个队列,每个队列消费者独立接收(非竞争)。

  • 生产者:

    package com.sangui.fanout;

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sangui.util.ConnectionUtil;

    public class Producer {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    String exchangeName = "test_fanout";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);

    String queue1Name = "test_fanout_queue1";
    String queue2Name = "test_fanout_queue2";
    channel.queueDeclare(queue1Name, true, false, false, null);
    channel.queueDeclare(queue2Name, true, false, false, null);

    channel.queueBind(queue1Name, exchangeName, "");
    channel.queueBind(queue2Name, exchangeName, "");

    String body = "日志信息:张三调用了 findAll 方法...日志级别:info...";
    channel.basicPublish(exchangeName, "", null, body.getBytes());

    channel.close();
    connection.close();
    }
    }
  • 消费者(Consumer1 和 Consumer2 类似,仅队列和打印差异):

    package com.sangui.fanout;

    import com.rabbitmq.client.*;
    import com.sangui.util.ConnectionUtil;

    import java.io.IOException;

    public class Consumer1 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue1Name = "test_fanout_queue1";
    channel.queueDeclare(queue1Name, true, false, false, null);

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body:" + new String(body));
    System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
    }
    };
    channel.basicConsume(queue1Name, true, consumer);
    }
    }

    效果:两者均接收消息。区别于 Work Queues:广播 vs 竞争;显式 Exchange vs 默认。

Routing(路由模式)

使用 Direct Exchange,根据 Routing Key 精确路由。

  • 生产者:

    package com.sangui.routing;

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sangui.util.ConnectionUtil;

    public class Producer {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_direct";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

    String queue1Name = "test_direct_queue1";
    String queue2Name = "test_direct_queue2";
    channel.queueDeclare(queue1Name, true, false, false, null);
    channel.queueDeclare(queue2Name, true, false, false, null);

    channel.queueBind(queue1Name, exchangeName, "error");
    channel.queueBind(queue2Name, exchangeName, "info");
    channel.queueBind(queue2Name, exchangeName, "error");
    channel.queueBind(queue2Name, exchangeName, "warning");

    String message = "日志信息:张三调用了 delete 方法.错误了,日志级别 warning";
    channel.basicPublish(exchangeName, "warning", null, message.getBytes());

    channel.close();
    connection.close();
    }
    }
  • 消费者(类似,仅队列差异):

    package com.sangui.routing;

    import com.rabbitmq.client.*;
    import com.sangui.util.ConnectionUtil;

    import java.io.IOException;

    public class Consumer2 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queue2Name = "test_direct_queue2";
    channel.queueDeclare(queue2Name, true, false, false, null);

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body:" + new String(body));
    System.out.println("Consumer2 将日志信息存储到数据库.....");
    }
    };
    channel.basicConsume(queue2Name, true, consumer);
    }
    }

    效果:仅匹配 warning 的队列 2 消费。支持定点或多播。

Topics(主题模式)

Topic Exchange 使用通配符 Routing Key:*(匹配一个词)、#(匹配零或多个词)。Routing Key 格式如 item.insert。

  • 生产者:

    package com.sangui.topic;

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sangui.util.ConnectionUtil;

    public class Producer {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_topic";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

    String queue1Name = "test_topic_queue1";
    String queue2Name = "test_topic_queue2";
    channel.queueDeclare(queue1Name, true, false, false, null);
    channel.queueDeclare(queue2Name, true, false, false, null);

    channel.queueBind(queue1Name, exchangeName, "#.error");
    channel.queueBind(queue1Name, exchangeName, "order.*");
    channel.queueBind(queue2Name, exchangeName, "*.*");

    String body1 = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
    channel.basicPublish(exchangeName, "order.info", null, body1.getBytes());

    String body2 = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
    channel.basicPublish(exchangeName, "goods.info", null, body2.getBytes());

    String body3 = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
    channel.basicPublish(exchangeName, "goods.error", null, body3.getBytes());

    channel.close();
    connection.close();
    }
    }
  • 消费者(类似,仅队列差异):

    package com.sangui.topic;

    import com.rabbitmq.client.*;
    import com.sangui.util.ConnectionUtil;

    import java.io.IOException;

    public class Consumer1 {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    String queueName = "test_topic_queue1";
    channel.queueDeclare(queueName, true, false, false, null);

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body:" + new String(body));
    }
    };
    channel.basicConsume(queueName, true, consumer);
    }
    }

    效果:队列 1 接收 order.info 和 goods.error;队列 2 接收全部。灵活匹配动态路由。

其他模式简述

  • RPC:同步调用,非典型异步,不展开。

  • Publisher Confirms:发送确认,后续可靠性章节详述。

工作模式小结

  • 直接到队列:默认 Exchange,简单/Work Queues。

  • 经 Exchange:

    • Fanout:广播,无 Routing Key。

    • Direct:精确 Routing Key,定点/多播。

    • Topic:通配符 Routing Key。

RabbitMQ 基础至此结束,后续探讨高级主题如消息可靠性和集群。

  • 微信
  • 赶快加我聊天吧
  • QQ
  • 赶快加我聊天吧
  • weinxin
三桂

发表评论 取消回复 您未登录,登录后才能评论,前往登录