1. 概述
本文我们来学习 Spring Cloud Alibaba 提供的 Spring Cloud Stream RocketMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RocketMQ 作为消息中间件,实现消息驱动的微服务。
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
在开始本文之前,胖友需要对 RocketMQ 进行简单的学习。可以阅读《RocketMQ 极简入门》文章,将第一二小节看完,在本机搭建一个 RocketMQ 服务。
2. Spring Cloud Stream 介绍
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。
友情提示:可能有胖友对 Broker 不太了解,我们来简单解释下。
一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。
例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
① Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。
② Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
最终整体交互如下图所示:
可能看完之后,胖友对 Spring Cloud Stream 还是有点懵逼,并且觉得概念怎么这么多呢?不要慌,我们先来快速入个门,会有更加具象的感受。
3. 快速入门
本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:
3.1 搭建生产者
3.1.1 引入依赖
创建 pom.xml 文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。
通过引入 spring-cloud-starter-stream-rocketmq 依赖,引入并实现 RocketMQ 的自动配置。在该依赖中,已经帮我们自动引入 RocketMQ 的大量依赖,非常方便,如下图所示:
3.1.2 配置文件
创建 application.yaml 配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。
spring.cloud.stream
spring.cloud.stream.bindings@Input@Output
demo01-output
destinationDEMO-TOPIC-01
主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
content-type
spring.cloud.stream.rocketmq
spring.cloud.stream.rocketmq.binder
name-server
名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
spring.cloud.stream.rocketmq.bindingsspring.cloud.stream.bindings
demo01-outputproducer
group
生产者组(Producer Group):同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
syncfalsetrue
使用 RocketMQ 发送三种类型的消息:同步消息(sync)、异步消息(async)和单向消息(oneway)。其中前两种消息是可靠的,因为会有发送是否成功的应答。
3.1.3 MySource
创建 MySource 接口,声明名字为 Output Binding。代码如下:
demo01-outputspring.cloud.stream.bindings
@Output
@Output@Output
#demo01Output()
#afterPropertiesSet()#invoke(MethodInvocation invocation)
3.1.4 Demo01Message
创建 Demo01Message 类,示例 Message 消息。代码如下:
3.1.5 Demo01Controller
创建 Demo01Controller 类,提供发送消息的 HTTP 接口。代码如下:
@Autowired<1><2><3>
3.1.6 ProducerApplication
创建 ProducerApplication 类,启动应用。代码如下:
@Input@Output
3.2 搭建消费者
3.2.1 引入依赖
创建 pom.xml 文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。
3.2.2 配置文件
创建 application.yaml 配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。
总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。
spring.cloud.stream.bindings
demo01-input
group
消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
spring.cloud.stream.rocketmq.bindings
demo01-inputconsumer
enabledtrueenabledfalsebroadcastingfalse
- 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
这里一点要注意!!!艿艿加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:
"consumer_group_01""consumer_group_02"
"USER_REGISTER"
- 积分模块:判断如果是手机注册,给用户增加 20 积分。
- 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
- 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
- ... 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度。
友情提示:如果还不理解的话,没有关系,我们下面会演示下我们上面举的例子。
3.2.3 MySink
创建 MySink 接口,声明名字为 Input Binding。代码如下:
demo01-inputspring.cloud.stream.bindings
@Input
@Input@Input
#demo01Input()
#afterPropertiesSet()#invoke(MethodInvocation invocation)
3.2.4 Demo01Message
创建 Demo01Message 类,示例 Message 消息。
3.2.5 Demo01Consumer
创建 Demo01Consumer 类,消费消息。代码如下:
MySink.DEMO01_INPUT
又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。
3.2.6 ConsumerApplication
创建 ConsumerApplication 类,启动应用。代码如下:
@Input@Output
3.3 测试单集群多实例的场景
本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。
demo01-consumer-group-DEMO-TOPIC-01
Allow parallel run
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
符合预期。从日志可以看出,每条消息仅被消费一次。
3.4 测试多集群多实例的场景
本小节,我们会在二个消费者集群各启动两个实例,测试在集群消费的情况下的表现。
demo01-consumer-group-DEMO-TOPIC-01
labx-06-sca-stream-rocketmq-consumer-demospring.cloud.stream.bindings.demo01-input.groupX-demo01-consumer-group-DEMO-TOPIC-01
X-demo01-consumer-group-DEMO-TOPIC-01
③ 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
符合预期。从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。
3.5 小结
至此,我们已经完成了 Stream RocketMQ 的快速入门,是不是还是蛮简答的噢。现在胖友可以在回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。
4. 定时消息
在 RocketMQ 中,提供定时消息的功能。
定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:
延迟级别 | 时间 | 延迟级别 | 时间 | 延迟级别 | 时间 |
1 | 1s | 7 | 3m | 13 | 9m |
2 | 5s | ![]() 8 | 4m | 14 | 10m |
3 | 10s | 9 | 5m | 15 | 20m |
4 | 30s | 10 | 6m | 16 | 30m |
5 | 1m | 11 | 7m | 17 | 1h |
6 | 2m | 12 | 8m | 18 | 2h |
如果胖友想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列)。
下面,我们来搭建一个 RocketMQ 定时消息的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,修改 labx-06-sca-stream-rocketmq-producer-demo 发送定时消息,继续使用 labx-06-sca-stream-rocketmq-consumer-demo 消费消息。
4.1 Demo01Controller
修改 Demo01Controller 类,增发送定时消息的 HTTP 接口。代码如下:
MessageConst.PROPERTY_DELAY_TIME_LEVEL
4.2 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_delay 接口,发送延迟 10 秒的定时消息。IDEA 控制台输出日志如下:
符合预期。在 Producer 发送的消息之后,Consumer 确实 10 秒后才消费消息。
5. 消费重试
RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于「5. 定时消息」 来实现,第一次重试消费按照延迟级别为 3 开始。😈 所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。
不过要注意,只有集群消费模式下,才有消息重试。
下面,我们来搭建一个 RocketMQ 消息重试的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-retry 来模拟消费失败后的重试。
5.1 复制项目
将「2. 快速入门」小节的 labx-06-sca-stream-rocketmq-consumer-demo,复制出 labx-06-sca-stream-rocketmq-consumer-retry。
5.2 配置文件
delay-level-when-next-consumemax-attempts
delay-level-when-next-consume
- -1:不重复,直接放入死信队列
- 0:RocketMQ Broker 控制重试策略
0:RocketMQ Consumer 控制重试策略
可能胖友对 Broker 和 Consumer 控制重试策略有点懵逼!?每天消息首次消费失败时,Consumer 会发回给 Broker,并告诉 Broker 按照什么延迟级别开始,不断重新投递给 Consumer 直到消费成功或者到达最大延迟级别。
delay-level-when-next-consume
delay-level-when-next-consume
max-attempts
max-attempts
delay-level-when-next-consumemax-attempts
5.3 Demo01Consumer
修改 Demo01Consumer 类,在消费消息时抛出异常,从而模拟消费错误。代码如下:
5.4 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:
符合预期。从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。
6. 消费异常处理机制
在 Spring Cloud Stream 中,提供了通用的消费异常处理机制,可以拦截到消费者消费消息时发生的异常,进行自定义的处理逻辑。
下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「5. 消费重试」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-retry 复制出 labx-06-sca-stream-rocketmq-consumer-error-handler 来演示消费异常处理机制。
6.1 复制项目
将「5. 消费重试」小节的 labx-06-sca-stream-rocketmq-consumer-retry,复制出 labx-06-sca-stream-rocketmq-consumer-error-handler。
6.2 Demo01Consumer
修改 Demo01Consumer 类,增加消费异常处理方法。完整代码如下:
#onMessage(@Payload Demo01Message message)..errorserrorChannel
友情提示:先暂时记住 Spring Integration 这样的设定,艿艿也没去深究 T T,也是一脸懵逼。
因此,我们有两种方式来实现异常处理:
- 局部的异常处理:通过订阅指定错误 Channel
- 全局的异常处理:通过订阅全局错误 Channel
#handleError(ErrorMessage errorMessage)#onMessage(@Payload Demo01Message message)
#globalHandleError(ErrorMessage errorMessage)@StreamListener
④ 在全局和局部异常处理都定义的情况下,错误消息仅会被符合条件的局部错误异常处理。如果没有符合条件的,错误消息才会被全局异常处理。
6.3 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:
不过要注意,如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,所以就不会进行消费重试。
7. 广播消费
在上述的示例中,我们看到的都是使用集群消费,也是最常用的消费模式。而在一些场景下,我们需要使用广播消费。
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting 来演示广播消费。
7.1 复制项目
将「2. 快速入门」小节的 labx-06-sca-stream-rocketmq-consumer-demo,复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting。
7.2 配置文件
broadcastingtrue
7.3 简单测试
demo01-consumer-group-DEMO-TOPIC-01
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
符合预期。从日志可以看出,每条消息仅被每个消费者消费了一次。
8. 顺序消息
RocketMQ 提供了两种顺序级别:
- 普通顺序消息:Producer 将相关联的消息发送到相同的消息队列。
- 完全严格顺序:在【普通顺序消息】的基础上,Consumer 严格顺序消费。
官方文档是这么描述的:
消息有序,指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
- 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 Sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
注意,分区顺序就是普通顺序消息,全局顺序就是完全严格顺序。
下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:
8.1 搭建生产者
从 labx-06-sca-stream-rocketmq-producer-demo 复制出 labx-06-sca-stream-rocketmq-producer-orderly 来演示发送顺序消息。
8.1.1 配置文件
partition-key-expression
partition-key-expression
payload['id']ididDemo01Message.id
headers['partitionKey']
key.hashCode() % partitionCount
#determinePartition(Message message)
在获取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的 PartitionMessageQueueSelector 选择消息发送的队列。
id
这样,我们就能保证相同 Sharding Key 的消息,发送到相同的对应 RocketMQ Topic 的队列中。当前,前提是该 Topic 的队列总数不能变噢,不然计算的 Sharding Key 会发生变化。
8.1.2 Demo01Controller
修改 Demo01Controller 类,增加发送 3 条顺序消息的 HTTP 接口。代码如下:
id
另外,整列发送的虽然是顺序消息,但是和发送普通消息的代码是一模一样的。
8.2 搭建消费者
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting 来演示顺序消费消息
8.2.1 配置文件
orderly
8.2.2 Demo01Consumer
修改 Demo01Consumer 类,在消费消息时,打印出消息所在队列编号和线程编号,这样我们通过队列编号可以判断消息是否顺序发送,通过线程编号可以判断消息是否顺序消费。代码如下:
8.3 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_orderly 接口,发送顺序消息。IDEA 控制台输出日志如下:
id
9. 消息过滤
RocketMQ 提供了两种方式给 Consumer 进行消息的过滤:
- 基于 Tag 过滤
标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。
消息过滤目前是在 Broker 端实现的,优点是减少了 Broker 和 Consumer 之间的无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。
一般情况下,我们使用 Tag 过滤较多,我们来搭建一个 RocketMQ 使用 Tag 进行消息过滤的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:
先搭建消费者。
9.1 Demo01Controller
修改 Demo01Controller 类,增加发送 3 条带 Tag 的消息的 HTTP 接口。代码如下:
MessageConst.PROPERTY_TAGS
再搭建消费者。
9.2 复制项目
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-filter 来使用 Tag 过滤消息来消费。
9.3 配置文件
tagsyunai || yutouyunaiyutou
sql
9.4 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:
tudou
9.5 Demo01Consumer
咳咳咳:不知道如何取这标题,暂时用这个噶。
@StreamListenercondition
> /** > * A condition that must be met by all items that are dispatched to this method. > * @return a SpEL expression that must evaluate to a {@code boolean} value. > */ > String condition() default ""; >
@StreamListenercondition
rocketmq_TAGSyunai
9.6 再次测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:
tudouyutou
10. 事务消息
在分布式消息队列中,目前唯一提供完整的事务消息的,只有 RocketMQ 。关于这一点,还是可以鼓吹下的。
可能会有胖友怒喷艿艿,RabbitMQ 和 Kafka 也有事务消息啊,也支持发送事务消息的发送,以及后续的事务消息的 commit提交或 rollbackc 回滚。但是要考虑一个极端的情况,在本地数据库事务已经提交的时时候,如果因为网络原因,又或者崩溃等等意外,导致事务消息没有被 commit ,最终导致这条事务消息丢失,分布式事务出现问题。
相比来说,RocketMQ 提供事务回查机制,如果应用超过一定时长未 commit 或 rollback 这条事务消息,RocketMQ 会主动回查应用,询问这条事务消息是 commit 还是 rollback ,从而实现事务消息的状态最终能够被 commit 或是 rollback ,达到最终事务的一致性。
这也是为什么艿艿在上面专门加粗“完整的”三个字的原因。可能上述的描述,对于绝大多数没有了解过分布式事务的胖友,会比较陌生,所以推荐阅读如下两篇文章:
热心的艿艿:虽然说 RabbitMQ、Kafka 并未提供完整的事务消息,但是社区里,已经基于它们之上拓展,提供了事务回查的功能。例如说:Myth ,采用消息队列解决分布式事务的开源框架, 基于 Java 语言来开发(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架进行分布式事务。
下面,我们来搭建一个 RocketMQ 定时消息的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,修改 labx-06-sca-stream-rocketmq-producer-transaction 发送事务消息,继续使用 labx-06-sca-stream-rocketmq-consumer-demo 消费消息。
10.1 复制项目
从 labx-06-sca-stream-rocketmq-producer-demo 复制出 labx-06-sca-stream-rocketmq-producer-transaction 来发送事务消息。
10.2 配置文件
transactionaltrue
10.3 Demo01Controller
修改 Demo01Controller 类,增加发送事务消息的 HTTP 接口。代码如下:
args
10.4 TransactionListenerImpl
创建 TransactionListenerImpl 类,实现 MQ 事务的监听。代码如下:
"test"
② 实现 RocketMQLocalTransactionListener 接口,实现执行本地事务和检查本地事务的方法。
#executeLocalTransaction(...)
#sendMessageInTransaction(...)
RocketMQLocalTransactionState.UNKNOWN
#checkLocalTransaction(...)
RocketMQLocalTransactionState.COMMIT
一般来说,有两种方式实现本地事务回查时,返回事务消息的状态。
msg
msg
#executeLocalTransaction(...)idmsgRocketMQLocalTransactionState.UNKNOWNidmsgRocketMQLocalTransactionState.COMMITidmsgRocketMQLocalTransactionState.COMMITtry-catchcatchidmsgRocketMQLocalTransactionState.ROLLBACK#executeLocalTransaction(...)idmsg
相比来说,艿艿倾向第二种,实现更加简单通用,对于业务开发者,更加友好。和有几个朋友沟通了下,他们也是采用第二种。
10.5 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send_transaction 接口,发送事务消息。IDEA 控制台输出日志如下:
整个的执行过程,看看艿艿在日志上添加的说明。
11. 监控端点
Spring Cloud Stream 的 endpoint 模块,基于 Spring Boot Actuator,提供了自定义监控端点 bindings 和 channels,用于获取 Spring Cloud Stream 的 Binding 和 Channel 信息。
health
友情提示:对 Spring Boot Actuator 不了解的胖友,可以后续阅读《芋道 Spring Boot 监控端点 Actuator 入门》文章。
我们来搭建一个 Stream RocketMQ 监控端点的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:
11.1 搭建生产者
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-actuator,查看生产者的监控端点结果。
11.1.1 引入依赖
在 pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:
11.1.2 配置文件
修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:
每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。
11.1.3 简单测试
① 使用 ProducerApplication 启动生产者。
bindings
channels
health
11.2 搭建消费者
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-filter,查看消费者的监控端点结果。
11.2.1 引入依赖
在 pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:
11.2.2 配置文件
修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:
每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。
112.3 简单测试
① 使用 ConsumerApplication 启动消费者,随机端口为 19541。
bindings
channels
health
12. 更多的配置项信息
Spring Cloud Alibaba Stream RocketMQ 提供的配置项挺多的,我们参考文档将配置项一起梳理下。
RocketMQ Binder Properties
spring.cloud.stream.rocketmq.binder
name-server127.0.0.1:9876access-keysecret-keyenable-msg-tracetruecustomized-trace-topicRMQ_SYS_TRACE_TOPIC
RocketMQ Consumer Properties
spring.cloud.stream.rocketmq.bindings..consumer.
enabletruetags||sqlbroadcastingfalseorderlyfalsedelayLevelWhenNextConsumesuspendCurrentQueueTimeMillis
RocketMQ Provider Properties
enabletruegroupmaxMessageSizetransactionalfalsesyncfalsevipChannelEnabledtruesendMessageTimeoutcompressMessageBodyThresholdretryTimesWhenSendFailedretryTimesWhenSendAsyncFailedretryNextServerfalse
13.接入阿里云的消息队列 RocketMQ
在阿里云上,提供消息队列 RocketMQ 服务。那么,我们是否能够使用 Spring Cloud Alibaba Stream RocketMQ 实现阿里云 RocketMQ 的消息的发送与消费呢?
答案是可以。在 《阿里云 —— 消息队列 MQ —— 开源 Java SDK 接入说明》 中,提到目前开源的 Java SDK 可以接入阿里云 RocketMQ 服务。
如果您已使用开源 Java SDK 进行生产,只需参考方法,重新配置参数,即可实现无缝上云。
前提条件
- 已在阿里云 MQ 控制台创建资源,包括 Topic、Group ID(GID)、接入点(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
- 已下载开源 RocketMQ 4.5.1 或以上版本,以支持连接阿里云 MQ。
我们来搭建一个 Stream RocketMQ 监控端点的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:
13.1 搭建生产者
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作为生产者。
access-keysecret-key
13.2 搭建消费者
从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作为消费者。
access-keysecret-key
13.3 简单测试
① 执行 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。IDEA 控制台输出日志如下: