消息队列之顺序消息
说顺序消息主要是包括两个方便,一个消息的顺序生产,比如订单。消息的生产一定是按照 创建->支付->发货->退款等顺序进行的;另外一个是顺序消费,消费者会严格按照消息的生产顺序进行消费。
Rocketmq:
生产顺序: 通过hashKey,将同一个key的消息一定发送到同一个队列上。每个队列的消息都是追加的,一定是按序存储的。不过在生产时,必须采用同步方式发送,否则异步的话不能保证一定是按序发送。
消费顺序:
Rocketmq保证在集群模式下,一个消息只能被同一个Consumer Group下的一个Consumer消费;广播模式下,就算是同一个group,也有可能被多个Consumer消费。
所以如果想实现顺序消费,一个基本前提就是一定是集群模式。
根据Rocketmq的Rebalance实现队列的分配:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//队列集合
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//消费者集合
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//根据不同分配策略完成分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
虽然,一个队列只会被同组的一个消费者处理,但对于一个消费者内的多个线程是怎么做到是顺序消费的呢?
简单来说,首先会有一个线程负责周期性的将消息拉取到本地,只有成功获取远端Broker对应队列的锁才会执行拉取消息操作。拉到本地的队列之后,如果是顺序消息,每次消费都会加锁,只有获取锁的才能够消费。如果失败的就会重试。
如果消费端要实现顺序消费需要配置一下消费模式,consumerMode,默认是并发模式。
参考资料:
1005-RocketMQ源码解析:Message拉取&消费(下).md
--------EOF---------
微信分享/微信扫码阅读
微信分享/微信扫码阅读