消息队列之顺序消息

说顺序消息主要是包括两个方便,一个消息的顺序生产,比如订单。消息的生产一定是按照 创建->支付->发货->退款等顺序进行的;另外一个是顺序消费,消费者会严格按照消息的生产顺序进行消费。

Rocketmq:

生产顺序: 通过hashKey,将同一个key的消息一定发送到同一个队列上。每个队列的消息都是追加的,一定是按序存储的。不过在生产时,必须采用同步方式发送,否则异步的话不能保证一定是按序发送。

消费顺序:

Rocketmq保证在集群模式下,一个消息只能被同一个Consumer Group下的一个Consumer消费;广播模式下,就算是同一个group,也有可能被多个Consumer消费。

所以如果想实现顺序消费,一个基本前提就是一定是集群模式。

根据Rocketmq的Rebalance实现队列的分配:

  private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                //队列集合
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                 //消费者集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
           

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                       //根据不同分配策略完成分配
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

虽然,一个队列只会被同组的一个消费者处理,但对于一个消费者内的多个线程是怎么做到是顺序消费的呢?

简单来说,首先会有一个线程负责周期性的将消息拉取到本地,只有成功获取远端Broker对应队列的锁才会执行拉取消息操作。拉到本地的队列之后,如果是顺序消息,每次消费都会加锁,只有获取锁的才能够消费。如果失败的就会重试。

如果消费端要实现顺序消费需要配置一下消费模式,consumerMode,默认是并发模式。

参考资料:

Rocketmq源码分析13:consumer 消费偏移量

4.0 消费者Rebalance机制

1005-RocketMQ源码解析:Message拉取&消费(下).md

--------EOF---------
微信分享/微信扫码阅读