SpringBoot整合RocketMQ
1.添加maven依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.4</version> </dependency>
|
2.添加配置
1 2 3 4
| rocketmq: name-server: "127.0.0.1:9876" producer: group: "MyTestGroup"
|
3.创建RocketMQTemplate
可以直接用spring boot已经实例化好的template
1 2
| @Resource private RocketMQTemplate rocketMQTemplate;
|
也可以自己创建一个template
1 2 3
| @ExtRocketMQTemplateConfiguration() public class EXTProducer1 extends RocketMQTemplate { }
|
4.创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @RocketMQMessageListener( consumerGroup = "MyTestGroup", topic = "MyTopic", consumeMode = ConsumeMode.ORDERLY) public class MyConsumer1 implements RocketMQListener<MessageExt> { private static final Logger log = LoggerFactory.getLogger(MyConsumer1.class);
@Override public void onMessage(MessageExt messageExt) { String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8); String tags = messageExt.getTags(); String topic = messageExt.getTopic(); log.info("message - topic:{}, tags:{}, body:{}", topic, tags, messageBody); } }
|
5.普通消息
单向发送
直接发送消息,不需要等待broker的结果。
1 2 3 4 5 6 7 8 9 10
| @RequestMapping("/sendOneWay") public void sendOneWay(String message) {
Message<String> msgs = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "key1") .build(); extProducer1.sendOneWay("MyTopic:A", msgs);
log.info("send one way success"); }
|
同步发送
会阻塞当前线程,等待broker的结果。
发送成功SendResult会包含发送消息的状态。
发送失败,会抛出MessagingException异常。
1 2 3 4 5 6 7 8 9 10
| @RequestMapping("/syncSend") public void syncSend(String message) {
Message<String> msgs = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "key2") .build(); SendResult sendResult = extProducer1.syncSend("MyTopic:B", msgs, 20000);
log.info("sync send result: {}", sendResult.getSendStatus()); }
|
异步发送
不会阻塞当前线程,新开一个线程执行回调函数,进行成功或者失败的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @RequestMapping("/asyncSend") public void asyncSend(String message) { Message<String> msgs = MessageBuilder.withPayload(message) .setHeader(RocketMQHeaders.KEYS, "key3") .build(); extProducer1.asyncSend("MyTopic:C", msgs, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("async send result: {}", sendResult.getSendStatus()); }
@Override public void onException(Throwable throwable) { log.info("async send field: {}", throwable.getMessage()); } }); log.info("sync send success"); }
|
6.顺序消息
7.事务消息