一、MQ简介

1.什么是消息队列(MQ)?

消息队列是一种异步的进程间通信(IPC)方式。它本质上是一个存储在内核或独立进程(如RabbitMQ、Kafka等)中的队列(先进先出的数据结构)。

它的基本模型很简单,通常包含两个角色:

  1. 生产者(Producer):负责创建消息并发送到消息队列。
  2. 消费者(Consumer):负责从消息队列中获取消息并进行处理。

消息(Message)就是在生产者和消费者之间传输的数据,可以是任何格式,如JSON、XML、纯文本等。

2.主要作用:解耦、异步、削峰

消息队列的核心价值可以通过三个关键词来理解:

  1. 解耦(Decoupling)
    • 问题:在传统同步调用中,服务A调用服务B,如果服务B宕机或修改接口,服务A会直接失败或需要跟着修改。服务之间是紧耦合的。
    • MQ的解决方案:服务A只需要将消息发送到MQ,就认为任务完成了,不需要知道由谁、何时来处理。服务B也只需要从MQ中取消息处理,不需要知道消息来自哪里。这样,服务A和服务B就完全解耦了,可以独立开发、部署和扩展。
  2. 异步(Asynchronous)
    • 问题:同步调用中,服务A调用服务B,必须等待服务B处理完成并返回结果后,才能继续执行。这段时间服务A是被阻塞的,如果服务B处理很慢,会导致服务A的性能急剧下降。
    • MQ的解决方案:服务A发送消息到MQ后立即返回,可以继续处理后续任务,无需等待。服务B可以稍后异步地从MQ中取出消息处理。这极大地提高了系统的吞吐量和响应速度。
  3. 削峰填谷(Peak Shaving)
    • 问题:系统通常会遇到流量高峰(如秒杀、抢购)。如果所有请求都直接打到处理服务上,服务很可能因为瞬间压力过大而崩溃。
    • MQ的解决方案:MQ就像一个巨大的“缓冲池”或“水库”。突如其来的大量请求可以先堆积在MQ中,后端的处理服务可以按照自己能够承受的速度,平稳地从MQ中消费消息。这样既避免了服务被冲垮,也充分利用了系统资源。

二、RocketMQ消息队列

RocketMQ 是一款开源的分布式消息中间件,由阿里巴巴团队基于多年的高并发业务场景和技术沉淀开发,并于 2016 年捐赠给 Apache 软件基金会,次年成为 Apache 顶级项目(Top-Level Project)。它以其高吞吐量、低延迟、高可用、高可靠、万亿级消息容量和卓越的可扩展性而闻名,是应对金融级、电商等苛刻场景的优选方案。


1.核心架构与概念

要理解 RocketMQ,首先要了解其核心组件和设计理念。

核心组件

一个典型的 RocketMQ 集群包含以下四个核心组件:

  • **NameServer (命名服务器)**:

    • 角色: 轻量级的发现服务,类似于 ZooKeeper 或 Consul,但更简单、无状态。
    • 功能: 管理整个集群的元数据(Metadata)。主要包括:
      • Broker 的列表和状态。
      • Topic 的路由信息(每个 Topic 分布在哪些 Broker 上)。
    • 工作方式: Broker 启动时会向所有 NameServer 注册;Producer 和 Consumer 启动时连接到 NameServer,获取路由信息,然后直接与 Broker 通信。NameServer 集群节点之间互不通信,极大地简化了设计,保证了高可用。
  • **Broker (代理服务器)**:

    • 角色: 消息中转角色,负责存储和传输消息,是真正干活的组件。
    • 功能: 接收来自 Producer 的消息并存储;处理 Consumer 的拉取请求,推送或供其拉取消息。
    • 高可用: 通常采用主从架构(Master-Slave)。Master 负责处理读写请求,Slave 从 Master 同步数据,作为热备。当 Master 宕机后,Consumer 可以从 Slave 读取消息,保证消息不丢(但不支持自动故障转移下的写入,需要人工干预)。
  • **Producer (生产者)**:

    • 消息的发送方。它从 NameServer 获取 Broker 地址,然后与 Broker 建立连接,将消息发送到指定的 Topic。
  • **Consumer (消费者)**:

    • 消息的接收方。同样从 NameServer 获取 Broker 地址,连接到 Broker 并订阅其感兴趣的 Topic 消息。RocketMQ 支持两种消费模式:
      • Push模式: Broker 收到新消息后主动推送给 Consumer(实质是 Consumer 拉取消息的长轮询模拟)。
      • Pull模式: Consumer 主动从 Broker 拉取消息。

核心概念

  • **Topic (主题)**: 消息的逻辑分类,生产者向指定 Topic 发送消息,消费者订阅指定 Topic 来消费消息。
  • **Message (消息)**: 传输的基本单位。包含 Topic、Body(消息体)、Tags(标签,用于进一步过滤)、Keys(唯一标识)等属性。
  • **Message Queue (消息队列)**: 这是 RocketMQ 存储和并行化的核心。一个 Topic 在物理上会被分为一个或多个 Queue(默认4个)。Queue 是消息存储的最小单元,消息会被顺序写入 Queue。
    • 并行消费: 正是因为一个 Topic 被分成了多个 Queue,Producer 可以并行地向多个 Queue 发送消息,Consumer 也可以以多个线程/实例并行地消费多个 Queue,从而实现极高的吞吐量。
  • **Tag (标签)**: 位于消息内部,是 Topic 下的次级分类。消费者可以基于 Topic + Tag 进行更细粒度的消息过滤。
  • **Group (组)**:
    • Producer Group: 同一类Producer的集合,发送同一类消息的Producer划分为一个组。
    • Consumer Group: 同一类Consumer的集合,消费逻辑完全相同的消费者属于同一个组。同一个 Consumer Group 内的消费者以负载均衡的方式共同消费其订阅的 Topic 的消息(每条消息只会被组内的一个消费者消费,即集群模式)。

2.核心特性与优势

  1. 高吞吐与低延迟: 采用零拷贝(Zero-copy)等技术优化网络和磁盘IO,在海量消息积压下依然能保持稳定的低延迟。
  2. 金融级高可用
    • Broker 主从架构: 支持同步/异步复制,保证消息不丢失。
    • 消息持久化: 所有消息都持久化到磁盘。
    • 丰富的故障恢复机制
  3. 消息可靠性: 提供至少一次(At Least Once) 的投递语义,通过事务消息等机制可以最大努力实现最终一致性。
  4. 万亿级消息堆积能力: 采用单一、连续磁盘文件结构存储所有消息,具有极高的存储效率,支持海量消息长时间堆积而不影响性能。
  5. 灵活的扩展性: 所有组件(NameServer, Broker, Producer, Consumer)均可水平扩展。
  6. 丰富的消息类型
    • 顺序消息: 保证消息在同一个 Queue 内严格按照 FIFO 的顺序被消费(如保证同一个订单的创建、付款、发货消息顺序)。
    • 事务消息: 支持分布式事务,通过“半消息”和事务状态回查机制,保证本地事务和消息发送的最终一致性。
    • 定时/延时消息: 消息发送后不会立即被消费,而是在指定的延迟时间或绝对时间后被投递。
    • 批量消息: 一次性发送多条消息,减少网络开销。
    • 过滤消息: 支持基于 Tag 或自定义属性的消息过滤。

3.典型应用场景

  1. 异步解耦: 电商订单系统,下单后写入消息,由库存、物流、积分等系统异步消费,实现核心流程与周边流程的解耦。
  2. 削峰填谷: 秒杀活动。将瞬间的海量请求缓冲到 RocketMQ 中,后端服务按照自己的能力匀速消费,防止系统被冲垮。
  3. 顺序消息: 保证证券交易、订单状态变更等有严格顺序要求的业务场景。
  4. 分布式事务: 通过事务消息,实现跨系统的事务最终一致性。
  5. 数据同步: 作为数据总线,实时将业务数据变更同步到缓存(如Redis)、搜索索引(如Elasticsearch)或大数据平台。

4.与其他MQ的简单对比

特性 RocketMQ Kafka RabbitMQ
诞生背景 阿里巴巴/电商、金融 LinkedIn/大数据日志 Mozilla/企业级集成
吞吐量 极高(十万级) 极高(百万级,大数据领域更强) 一般(万级)
延迟 极低
可靠性 (金融级) 高(副本机制)
消息堆积 极强(万亿级) 强(依赖磁盘大小) 差(内存耗尽后性能骤降)
主要优势 综合能力强,高可用,顺序/事务消息 吞吐量极致,生态强大(流处理) 协议支持多(AMQP),消息路由灵活
典型场景 电商、金融交易、业务集成 日志处理、流式计算、大数据管道 企业应用集成、对延迟敏感的业务

三、RocketMQ快速实战

1.快速搭建RocketMQ服务(Windows)

下载RocketMQ服务

RocketMQ官网:RocketMQ · 官方网站 | RocketMQ,下载地址:下载 | RocketMQ

配置环境变量

解压后,配置环境变量ROCKETMQ_HOME:运行包根目录;配置环境变量NAMESRV_ADDR:localhost:9876。

运行服务

运行**/bin目前下,mqnamesrvmqbroker**。

2.运行dashboard可视化仪表板

下载RocketMQ Dashboard源码

在RocketMQ下载地址,最下方找到RocketMQ Dashboard下载,下载源码。

打包源码

RocketMQ Dashboard是一个Spring Boot项目,使用命令将源码打包成jar包:

1
mvn clean package -Dmaven.test.skip=true 

运行jar包

然后再运行jar包,启动dashboard。

3.工具测试消息收发

发送消息

1
tools org.apache.rocketmq.example.quickstart.Producer

消费消息

1
tools org.apache.rocketmq.example.quickstart.Consumer

4.SDK测试消息收发

  1. 在IDEA中创建一个Java工程。

  2. pom.xml 文件中添加以下依赖引入Java依赖库

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.8</version>
    </dependency>
  3. 在已创建的Java工程中,创建发送普通消息程序并运行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class ProducerTest {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

    public static void main(String[] args) throws ClientException {
    // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
    String endpoint = "localhost:8081";
    // 消息发送的目标Topic名称,需要提前创建。
    String topic = "TestTopic";
    ClientServiceProvider provider = ClientServiceProvider.loadService();
    ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
    ClientConfiguration configuration = builder.build();
    // 初始化Producer时需要设置通信配置以及预绑定的Topic。
    Producer producer = provider.newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(configuration)
    .build();
    // 普通消息发送。
    Message message = provider.newMessageBuilder()
    .setTopic(topic)
    // 设置消息索引键,可根据关键字精确查找某条消息。
    .setKeys("messageKey")
    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
    .setTag("messageTag")
    // 消息体。
    .setBody("messageBody".getBytes())
    .build();
    try {
    // 发送消息,需要关注发送结果,并捕获失败等异常。
    SendReceipt sendReceipt = producer.send(message);
    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (ClientException e) {
    logger.error("Failed to send message", e);
    }
    // producer.close();
    }
    }
  4. 在已创建的Java工程中,创建订阅普通消息程序并运行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class ConsumerTest {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerTest.class);

    private ConsumerTest() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
    String endpoints = "localhost:8081";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .build();
    // 订阅消息的过滤规则,表示订阅所有Tag的消息。
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // 为消费者指定所属的消费者分组,Group需要提前创建。
    String consumerGroup = "YourConsumerGroup";
    // 指定需要订阅哪个目标Topic,Topic需要提前创建。
    String topic = "TestTopic";
    // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 设置消费者分组。
    .setConsumerGroup(consumerGroup)
    // 设置预绑定的订阅关系。
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    // 设置消费监听器。
    .setMessageListener(messageView -> {
    // 处理消息并返回消费结果。
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    return ConsumeResult.SUCCESS;
    })
    .build();
    Thread.sleep(Long.MAX_VALUE);
    // 如果不需要再使用 PushConsumer,可关闭该实例。
    // pushConsumer.close();
    }
    }