我们知道RocketMQ主要分为消息 生产、分钟存储(消息堆积)、白话消费 三大块领域。系列息
那接下来,送消我们白话一下,分钟RocketMQ是白话如何发送消息的,揭秘消息生产全过程。系列息
注意,送消如果白话中不小心提到相关代码配置与类名,分钟请参考RocketMQ 4.9.4版本
RocketMQ生产消息时,支持多种「消息类型」:
SendResult send(final Message msg);
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg);
SendResult send(final Message msg, final MessageQueue mq);
上面列举的三种send方法,都是以同步发送模式为例。定时/延迟消息从发送方式角度来说,不算一种独立的消息类型。
一般我们要发送一条消息给RocketMQ,需要创建这样一个消息体。
Message msg = new Message( "TestTopic", "Hello World".getBytes() );
在这个消息体里面,我们只单纯指定了要发送的Topic名字,以及要发送的消息内容。
那么,RocketMQ-client怎么知道这条消息要发送到RocketMQ集群中的哪一个broker上呢?
这里需要了解下RocketMQ中Topic的「路由注册与发现机制」。
RocketMQ基本架构
Topic 路由注册与发现:
Topic路由信息
Topic的路由信息,包括了Topic的 队列queue和broker的映射关系 ,那么如何利用这个Topic的路由信息呢?
我们需要根据前面的不同「消息类型」进行分别讨论:
根据消息类型获取到目标队列queue后,就可以根据Topic路由信息发送消息到指定broker上了。
从发送模式角度来说,RocketMQ有三种「消息发送模式」:
SendResult send(final Message msg);
void send(final Message msg, final SendCallback sendCallback);
void sendOneway(final Message msg);
上面列举的三种send方法,都是以「普通消息」为例。
「消息类型」 和 「消息发送模式」 是 N*M 的关系,所以聪明的你一定已经想到了,存在9种不同组合(不包括事物消息),RocketMQ也是在接口中定义了9种不同方法。
前面介绍了三种「消息发送模式」,其中「单向发送」属于不可靠发送,我们无法知道是否发送成功。
而「同步发送」和「异步发送」都是可靠发送,我们能够获取发送状态,知道是否成功。
在「同步发送」中,我们可以根据SendResult中的sendStatus属性判断是否发送成功。
SendResult类属性
在「异步发送」中,我们可以自定义实现SendCallback的onSuccess()方法和onException()方法,来判断消息是否发送成功。
SendCallback接口定义
如果消息发送失败了,RocketMQ-client默认有重试机制,以确保消息的高可用性。
前面提到,生产者每30秒获取一次主题的路由信息,所以即使某个 Broker 宕机,消息发送者可能无法立即察觉到它的宕机状态。
但是,当消息发送者向某个 Broker 发送消息后,如果返回异常,生产者会在接下来的一段时间内(例如5分钟)避免再次选择该 Broker 上的队列来发送消息。这样做的目的是规避可能发生故障的 Broker。
当然了,用户也能根据返回的异常,自己定义业务重试、补偿机制。
需要注意的是,不同「消息类型」和「消息发送模式」的RocketMQ-client默认重试机制不同。
消息类型:
注意:有序消息异常时RocketMQ-client都是默认不重试
消息发送模式:
注意:单向发送模式异常时RocketMQ-client默认不重试
(责任编辑:探索)
2021年前三季度国内旅游总人次26.89亿 旅游收入2.37万亿元
2021年1—8月份全国规模以上工业企业利润同比增长49.5% 总额56051.4亿元
帅丰电器(605336.SH)拟推176.25万股限制性股票激励计划 授予价格为13.62元/股