RocketMQ学习(一)
一、MQ简介
1.什么是消息队列(MQ)?
消息队列是一种异步的进程间通信(IPC)方式。它本质上是一个存储在内核或独立进程(如RabbitMQ、Kafka等)中的队列(先进先出的数据结构)。
它的基本模型很简单,通常包含两个角色:
- 生产者(Producer):负责创建消息并发送到消息队列。
- 消费者(Consumer):负责从消息队列中获取消息并进行处理。
消息(Message)就是在生产者和消费者之间传输的数据,可以是任何格式,如JSON、XML、纯文本等。
2.主要作用:解耦、异步、削峰
消息队列的核心价值可以通过三个关键词来理解:
- 解耦(Decoupling)
- 问题:在传统同步调用中,服务A调用服务B,如果服务B宕机或修改接口,服务A会直接失败或需要跟着修改。服务之间是紧耦合的。
- MQ的解决方案:服务A只需要将消息发送到MQ,就认为任务完成了,不需要知道由谁、何时来处理。服务B也只需要从MQ中取消息处理,不需要知道消息来自哪里。这样,服务A和服务B就完全解耦了,可以独立开发、部署和扩展。
- 异步(Asynchronous)
- 问题:同步调用中,服务A调用服务B,必须等待服务B处理完成并返回结果后,才能继续执行。这段时间服务A是被阻塞的,如果服务B处理很慢,会导致服务A的性能急剧下降。
- MQ的解决方案:服务A发送消息到MQ后立即返回,可以继续处理后续任务,无需等待。服务B可以稍后异步地从MQ中取出消息处理。这极大地提高了系统的吞吐量和响应速度。
- 削峰填谷(Peak Shaving)
- 问题:系统通常会遇到流量高峰(如秒杀、抢购)。如果所有请求都直接打到处理服务上,服务很可能因为瞬间压力过大而崩溃。
- MQ的解决方案:MQ就像一个巨大的“缓冲池”或“水库”。突如其来的大量请求可以先堆积在MQ中,后端的处理服务可以按照自己能够承受的速度,平稳地从MQ中消费消息。这样既避免了服务被冲垮,也充分利用了系统资源。
二、RocketMQ消息队列
RocketMQ 是一款开源的分布式消息中间件,由阿里巴巴团队基于多年的高并发业务场景和技术沉淀开发,并于 2016 年捐赠给 Apache 软件基金会,次年成为 Apache 顶级项目(Top-Level Project)。它以其高吞吐量、低延迟、高可用、高可靠、万亿级消息容量和卓越的可扩展性而闻名,是应对金融级、电商等苛刻场景的优选方案。
1.核心架构与概念
要理解 RocketMQ,首先要了解其核心组件和设计理念。
核心组件
一个典型的 RocketMQ 集群包含以下四个核心组件:
**NameServer (命名服务器)**:
- 角色: 轻量级的发现服务,类似于 ZooKeeper 或 Consul,但更简单、无状态。
- 功能: 管理整个集群的元数据(Metadata)。主要包括:
- Broker 的列表和状态。
- Topic 的路由信息(每个 Topic 分布在哪些 Broker 上)。
- 工作方式: Broker 启动时会向所有 NameServer 注册;Producer 和 Consumer 启动时连接到 NameServer,获取路由信息,然后直接与 Broker 通信。NameServer 集群节点之间互不通信,极大地简化了设计,保证了高可用。
**Broker (代理服务器)**:
- 角色: 消息中转角色,负责存储和传输消息,是真正干活的组件。
- 功能: 接收来自 Producer 的消息并存储;处理 Consumer 的拉取请求,推送或供其拉取消息。
- 高可用: 通常采用主从架构(Master-Slave)。Master 负责处理读写请求,Slave 从 Master 同步数据,作为热备。当 Master 宕机后,Consumer 可以从 Slave 读取消息,保证消息不丢(但不支持自动故障转移下的写入,需要人工干预)。
**Producer (生产者)**:
- 消息的发送方。它从 NameServer 获取 Broker 地址,然后与 Broker 建立连接,将消息发送到指定的 Topic。
**Consumer (消费者)**:
- 消息的接收方。同样从 NameServer 获取 Broker 地址,连接到 Broker 并订阅其感兴趣的 Topic 消息。RocketMQ 支持两种消费模式:
- Push模式: Broker 收到新消息后主动推送给 Consumer(实质是 Consumer 拉取消息的长轮询模拟)。
- Pull模式: Consumer 主动从 Broker 拉取消息。
- 消息的接收方。同样从 NameServer 获取 Broker 地址,连接到 Broker 并订阅其感兴趣的 Topic 消息。RocketMQ 支持两种消费模式:
核心概念
- **Topic (主题)**: 消息的逻辑分类,生产者向指定 Topic 发送消息,消费者订阅指定 Topic 来消费消息。
- **Message (消息)**: 传输的基本单位。包含 Topic、Body(消息体)、Tags(标签,用于进一步过滤)、Keys(唯一标识)等属性。
- **Message Queue (消息队列)**: 这是 RocketMQ 存储和并行化的核心。一个 Topic 在物理上会被分为一个或多个 Queue(默认4个)。Queue 是消息存储的最小单元,消息会被顺序写入 Queue。
- 并行消费: 正是因为一个 Topic 被分成了多个 Queue,Producer 可以并行地向多个 Queue 发送消息,Consumer 也可以以多个线程/实例并行地消费多个 Queue,从而实现极高的吞吐量。
- **Tag (标签)**: 位于消息内部,是 Topic 下的次级分类。消费者可以基于 Topic + Tag 进行更细粒度的消息过滤。
- **Group (组)**:
- Producer Group: 同一类Producer的集合,发送同一类消息的Producer划分为一个组。
- Consumer Group: 同一类Consumer的集合,消费逻辑完全相同的消费者属于同一个组。同一个 Consumer Group 内的消费者以负载均衡的方式共同消费其订阅的 Topic 的消息(每条消息只会被组内的一个消费者消费,即集群模式)。
2.核心特性与优势
- 高吞吐与低延迟: 采用零拷贝(Zero-copy)等技术优化网络和磁盘IO,在海量消息积压下依然能保持稳定的低延迟。
- 金融级高可用:
- Broker 主从架构: 支持同步/异步复制,保证消息不丢失。
- 消息持久化: 所有消息都持久化到磁盘。
- 丰富的故障恢复机制。
- 消息可靠性: 提供至少一次(At Least Once) 的投递语义,通过事务消息等机制可以最大努力实现最终一致性。
- 万亿级消息堆积能力: 采用单一、连续磁盘文件结构存储所有消息,具有极高的存储效率,支持海量消息长时间堆积而不影响性能。
- 灵活的扩展性: 所有组件(NameServer, Broker, Producer, Consumer)均可水平扩展。
- 丰富的消息类型:
- 顺序消息: 保证消息在同一个 Queue 内严格按照 FIFO 的顺序被消费(如保证同一个订单的创建、付款、发货消息顺序)。
- 事务消息: 支持分布式事务,通过“半消息”和事务状态回查机制,保证本地事务和消息发送的最终一致性。
- 定时/延时消息: 消息发送后不会立即被消费,而是在指定的延迟时间或绝对时间后被投递。
- 批量消息: 一次性发送多条消息,减少网络开销。
- 过滤消息: 支持基于 Tag 或自定义属性的消息过滤。
3.典型应用场景
- 异步解耦: 电商订单系统,下单后写入消息,由库存、物流、积分等系统异步消费,实现核心流程与周边流程的解耦。
- 削峰填谷: 秒杀活动。将瞬间的海量请求缓冲到 RocketMQ 中,后端服务按照自己的能力匀速消费,防止系统被冲垮。
- 顺序消息: 保证证券交易、订单状态变更等有严格顺序要求的业务场景。
- 分布式事务: 通过事务消息,实现跨系统的事务最终一致性。
- 数据同步: 作为数据总线,实时将业务数据变更同步到缓存(如Redis)、搜索索引(如Elasticsearch)或大数据平台。
4.与其他MQ的简单对比
特性 | RocketMQ | Kafka | RabbitMQ |
---|---|---|---|
诞生背景 | 阿里巴巴/电商、金融 | LinkedIn/大数据日志 | Mozilla/企业级集成 |
吞吐量 | 极高(十万级) | 极高(百万级,大数据领域更强) | 一般(万级) |
延迟 | 低 | 低 | 极低 |
可靠性 | 高(金融级) | 高(副本机制) | 高 |
消息堆积 | 极强(万亿级) | 强(依赖磁盘大小) | 差(内存耗尽后性能骤降) |
主要优势 | 综合能力强,高可用,顺序/事务消息 | 吞吐量极致,生态强大(流处理) | 协议支持多(AMQP),消息路由灵活 |
典型场景 | 电商、金融交易、业务集成 | 日志处理、流式计算、大数据管道 | 企业应用集成、对延迟敏感的业务 |
三、RocketMQ快速实战
1.快速搭建RocketMQ服务(Windows)
下载RocketMQ服务
RocketMQ官网:RocketMQ · 官方网站 | RocketMQ,下载地址:下载 | RocketMQ。
配置环境变量
解压后,配置环境变量ROCKETMQ_HOME:运行包根目录;配置环境变量NAMESRV_ADDR:localhost:9876。
运行服务
运行**/bin目前下,mqnamesrv和mqbroker**。
2.运行dashboard可视化仪表板
下载RocketMQ Dashboard源码
在RocketMQ下载地址,最下方找到RocketMQ Dashboard下载,下载源码。
打包源码
RocketMQ Dashboard是一个Spring Boot项目,使用命令将源码打包成jar包:
1 | mvn clean package -Dmaven.test.skip=true |
运行jar包
然后再运行jar包,启动dashboard。
3.工具测试消息收发
发送消息
1 | tools org.apache.rocketmq.example.quickstart.Producer |
消费消息
1 | tools org.apache.rocketmq.example.quickstart.Consumer |
4.SDK测试消息收发
在IDEA中创建一个Java工程。
在 pom.xml 文件中添加以下依赖引入Java依赖库
1
2
3
4
5<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.8</version>
</dependency>在已创建的Java工程中,创建发送普通消息程序并运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class ProducerTest {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
String endpoint = "localhost:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}在已创建的Java工程中,创建订阅普通消息程序并运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class ConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(ConsumerTest.class);
private ConsumerTest() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}