SpringBoot整合RocketMQ

1.添加maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>

2.添加配置

1
2
3
4
rocketmq:
name-server: "127.0.0.1:9876"
producer:
group: "MyTestGroup"

3.创建RocketMQTemplate

可以直接用spring boot已经实例化好的template

1
2
@Resource
private RocketMQTemplate rocketMQTemplate;

也可以自己创建一个template

1
2
3
@ExtRocketMQTemplateConfiguration()
public class EXTProducer1 extends RocketMQTemplate {
}

4.创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@RocketMQMessageListener(
consumerGroup = "MyTestGroup",
topic = "MyTopic",
consumeMode = ConsumeMode.ORDERLY)
public class MyConsumer1 implements RocketMQListener<MessageExt> {
private static final Logger log = LoggerFactory.getLogger(MyConsumer1.class);

@Override
public void onMessage(MessageExt messageExt) {
String messageBody = new String(messageExt.getBody(),
StandardCharsets.UTF_8);
String tags = messageExt.getTags();
String topic = messageExt.getTopic();
log.info("message - topic:{}, tags:{}, body:{}", topic, tags, messageBody);
}
}

5.普通消息

单向发送

直接发送消息,不需要等待broker的结果。

1
2
3
4
5
6
7
8
9
10
@RequestMapping("/sendOneWay")
public void sendOneWay(String message) {

Message<String> msgs = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, "key1")
.build();
extProducer1.sendOneWay("MyTopic:A", msgs);

log.info("send one way success");
}

同步发送

会阻塞当前线程,等待broker的结果。

发送成功SendResult会包含发送消息的状态。

发送失败,会抛出MessagingException异常。

1
2
3
4
5
6
7
8
9
10
@RequestMapping("/syncSend")
public void syncSend(String message) {

Message<String> msgs = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, "key2")
.build();
SendResult sendResult = extProducer1.syncSend("MyTopic:B", msgs, 20000);

log.info("sync send result: {}", sendResult.getSendStatus());
}

异步发送

不会阻塞当前线程,新开一个线程执行回调函数,进行成功或者失败的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RequestMapping("/asyncSend")
public void asyncSend(String message) {
Message<String> msgs = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, "key3")
.build();
extProducer1.asyncSend("MyTopic:C", msgs, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("async send result: {}", sendResult.getSendStatus());
}

@Override
public void onException(Throwable throwable) {
log.info("async send field: {}", throwable.getMessage());
}
});
log.info("sync send success");
}

6.顺序消息

7.事务消息