Rocketmq初探
最近,公司开始逐步地将消息队列从talos(公司基于Kafka二次开发的)迁移到Rocketmq上了,主要还是考虑到其具备的优越性。
对于电商来说,有时订单系统和优惠系统,以及商品系统,售后系统需要对接,一般都会采用消息队列。但多数消息队列并不支持事务,而Rocketmq却支持这一点。此外Rocketmq可以保证按照key级别的顺序消息,对于某一个key,比如订单号,创建订单、支付订单、订单发货等等状态流转都是按照顺序的,他们会发送到同一个队列中,每个消费者也只会消费某个队列,这样就保证了顺序消费。同时还支持重试以及死信队列。总之Rocketmq主要还是为解决电商的实际使用场景而产生的。
我会带着一些问题去学习:
- Rocketmq怎么保证高可用的;
- Rocket的Producer和Consumer如何实现负载均衡的;
- Rocketmq如何保证消息的顺序性的;
- Rocketmq怎么处理消息丢失、消息重复的问题;
- Rocket是如何实现事务消息的;
- Rocketmq的底层通信方式
所有知识点都应该结合源码来学习的。
1、Rocketmq怎么保证高可用的
先来看看Rocketmq的架构:
在Rocketmq4.5版本之前,和Kafka的集群中有一个leader,leader挂掉后就重新选举一个不同,Rocketmq是一个master和多个slver,也可以是多个master,多个slave,或者多个master。如果是一个master多个slave,如果master挂掉,slave是不能自动切换的。但多个master如果有一个挂掉,还是可以继续用的。
一个topic可以发送到多个master中。每个master的多个slave.master的brokerid是0,其他的大于等于1.
由于上述中master故障,需要手动切换,4.5版本之后也实现了基于Raft协议的选举,类似Kafka.
其实现方式并不是如Kafka那样通过zk实现,而是使用基于 Raft 协议的 commitlog 存储库 DLedger。具体可阅读下面的参考资料。总之DLedger发现在某一组Broker中的master挂掉之后,可自动完成选举,实现slave到master的自动切换。通过异步和并发线程提升处理速度。
主从复制可以有异步和同步,同步意味着只有slave也成功同步了,才会commit;异步则不需要等待slave完成同步即可commit;
刷盘也分为同步和异步,同步刷盘可能要影响性能。
Rocketmq和Kafka的思想还略有不同,通过多个master以及主从复制等来实现高可用,减少了系统复杂度。
那说到了Rocketmq的集群,不得不提集群的扩容和缩容。Rocketmq可以实现动态的扩容和缩容。
现在有个问题是扩容或者缩容是消费者和消费者是如何感知。
无论是扩容还是缩容,都要执行updateTopic命令,会更新NameServer存储的节点信息,Producer和Consumer都会从NameServer获取最新的broker信息。Producer和Consumer都会和NameServer集群中的某个节点保持心跳,每隔30s获取一次最新的队列。
Rocket的Producer和Consumer如何实现负载均衡的
负载均衡主要分为生产端和消费端的负载均衡。
在Producer端,会根据获取的topic路由信息选择一个队列进行send。下面是官网的描述:
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。
看下源码:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//...略去
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
上面的selectOneMessageQueue就是具体的选择发送队列的方法。
具体实现:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
同样地,在消费端也实现了负载均衡,一般一个队列只能被一个消费者消费,但是一个消费者可以消费多个队列。
首先消费端实现负载均衡的前提是它会定时向所有的broker发送心跳的。其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
(2) 根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通信请求(Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
最后调用的是MQClientAPIImpl的方法:
public List<String> getConsumerIdListByGroup(
final String addr,
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetConsumerListByGroupResponseBody body =
GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
return body.getConsumerIdList();
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
(3) 先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。
(4) 然后,调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
Rocketmq如何保证消息的顺序性的
比如在电商中比较常见的订单状态的流转,订单创建、支付、发货、签收等等。每个状态要保证顺序的。
Rocketmq可以保证同一个Key(唯一的)都发送到同一个队列中,而队列是FIFO的,所以可以保证顺序发送。在消费端,一个队列只能被同一个group下的一个消费者消费,那就可以保证顺序消费。
看上面可以明白了,顺序消息是在同一个topic下讨论的,不是全局的。
看下源码吧:
主要是使用RocketMqTemplate的syncSendOrderly:
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception var12) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(var12.getMessage(), var12);
}
} else {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
}
selecetor:
private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
上面则是获取队列方式,通过hashkey取模。
顺序消息常见问题
-
同一条消息是否可以既是顺序消息,又是定时消息和事务消息?
不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。
-
顺序消息支持哪些地域?
支持消息队列RocketMQ版所有公共云地域和金融云地域。
-
为什么全局顺序消息性能一般?
全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。
-
顺序消息支持哪种消息发送方式?
顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。
-
顺序消息是否支持集群消费和广播消费?
顺序消息暂时仅支持集群消费模式,不支持广播消费模式。
Rocketmq的事务特性
Rocketmq一个重要的特性就是支持事务消息的。其实目前Kafka和Rabbitmq等也都支持。一般说支持事务都是指的半事务,即生产者和消息队列的事务,而不包括消费者。
下面是官网的一个事务示意图。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
下面的源码就是上面流程实现:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//获取自定义的事务listener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送事务消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//根据消息队列响应结果执行本地事务
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//根据本地事务执行状态,发送消息到消息队列,完成本次消息事务的commit或者rollback;
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
Rocketmq的底层通信
Rocketmq底层也是使用的Nettt框架通信,是在其基础上实现了自定义的通信协议。采用的模型为多Reactor多线程模型。具体可见: Java的NIO及Netty
通信流程就直接从官网上摘抄下来:
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。
其中用的比较多的就是NettyRemotingClient对象。
一个典型调用关系:
DefaultMqProducerImpl->MqClientInstance->MqClientApiImpl->NettyRemotingClient。
在Producer启动时,会start Netty客户端:
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
未完待续。。。
参考资料:
rocketmq 如何保证高可用_Kafka和RocketMQ如何保证高可用
RocketMQ 实现高可用多副本架构的关键:基于 Raft 协议的 commitlog 存储库 DLedger
微信分享/微信扫码阅读