Rocketmq初探

最近,公司开始逐步地将消息队列从talos(公司基于Kafka二次开发的)迁移到Rocketmq上了,主要还是考虑到其具备的优越性。

对于电商来说,有时订单系统和优惠系统,以及商品系统,售后系统需要对接,一般都会采用消息队列。但多数消息队列并不支持事务,而Rocketmq却支持这一点。此外Rocketmq可以保证按照key级别的顺序消息,对于某一个key,比如订单号,创建订单、支付订单、订单发货等等状态流转都是按照顺序的,他们会发送到同一个队列中,每个消费者也只会消费某个队列,这样就保证了顺序消费。同时还支持重试以及死信队列。总之Rocketmq主要还是为解决电商的实际使用场景而产生的。

我会带着一些问题去学习:

  1. Rocketmq怎么保证高可用的;
  2. Rocket的Producer和Consumer如何实现负载均衡的;
  3. Rocketmq如何保证消息的顺序性的;
  4. Rocketmq怎么处理消息丢失、消息重复的问题;
  5. Rocket是如何实现事务消息的;
  6. Rocketmq的底层通信方式

所有知识点都应该结合源码来学习的。

1、Rocketmq怎么保证高可用的

先来看看Rocketmq的架构:

在Rocketmq4.5版本之前,和Kafka的集群中有一个leader,leader挂掉后就重新选举一个不同,Rocketmq是一个master和多个slver,也可以是多个master,多个slave,或者多个master。如果是一个master多个slave,如果master挂掉,slave是不能自动切换的。但多个master如果有一个挂掉,还是可以继续用的。

一个topic可以发送到多个master中。每个master的多个slave.master的brokerid是0,其他的大于等于1.

由于上述中master故障,需要手动切换,4.5版本之后也实现了基于Raft协议的选举,类似Kafka.

其实现方式并不是如Kafka那样通过zk实现,而是使用基于 Raft 协议的 commitlog 存储库 DLedger。具体可阅读下面的参考资料。总之DLedger发现在某一组Broker中的master挂掉之后,可自动完成选举,实现slave到master的自动切换。通过异步和并发线程提升处理速度。

主从复制可以有异步和同步,同步意味着只有slave也成功同步了,才会commit;异步则不需要等待slave完成同步即可commit;

刷盘也分为同步和异步,同步刷盘可能要影响性能。

Rocketmq和Kafka的思想还略有不同,通过多个master以及主从复制等来实现高可用,减少了系统复杂度。

那说到了Rocketmq的集群,不得不提集群的扩容和缩容。Rocketmq可以实现动态的扩容和缩容。

现在有个问题是扩容或者缩容是消费者和消费者是如何感知。

无论是扩容还是缩容,都要执行updateTopic命令,会更新NameServer存储的节点信息,Producer和Consumer都会从NameServer获取最新的broker信息。Producer和Consumer都会和NameServer集群中的某个节点保持心跳,每隔30s获取一次最新的队列。

Rocket的Producer和Consumer如何实现负载均衡的

负载均衡主要分为生产端和消费端的负载均衡。

在Producer端,会根据获取的topic路由信息选择一个队列进行send。下面是官网的描述:

Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

看下源码:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
       //...略去
      for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

上面的selectOneMessageQueue就是具体的选择发送队列的方法。

具体实现:

 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

同样地,在消费端也实现了负载均衡,一般一个队列只能被一个消费者消费,但是一个消费者可以消费多个队列。

首先消费端实现负载均衡的前提是它会定时向所有的broker发送心跳的。其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。

(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

(2) 根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通信请求(Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);

List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

最后调用的是MQClientAPIImpl的方法:

 public List<String> getConsumerIdListByGroup(
        final String addr,
        final String consumerGroup,
        final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        MQBrokerException, InterruptedException {
        GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    GetConsumerListByGroupResponseBody body =
                        GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
                    return body.getConsumerIdList();
                }
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

(3) 先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。

(4) 然后,调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }

Rocketmq如何保证消息的顺序性的

比如在电商中比较常见的订单状态的流转,订单创建、支付、发货、签收等等。每个状态要保证顺序的。

Rocketmq可以保证同一个Key(唯一的)都发送到同一个队列中,而队列是FIFO的,所以可以保证顺序发送。在消费端,一个队列只能被同一个group下的一个消费者消费,那就可以保证顺序消费。

看上面可以明白了,顺序消息是在同一个topic下讨论的,不是全局的。

看下源码吧:

主要是使用RocketMqTemplate的syncSendOrderly:

 public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
        if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {
            try {
                long now = System.currentTimeMillis();
                org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
                SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
                long costTime = System.currentTimeMillis() - now;
                if (log.isDebugEnabled()) {
                    log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                }

                return sendResult;
            } catch (Exception var12) {
                log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
                throw new MessagingException(var12.getMessage(), var12);
            }
        } else {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
    }

selecetor:

private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

 @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }

上面则是获取队列方式,通过hashkey取模。

顺序消息常见问题

  • 同一条消息是否可以既是顺序消息,又是定时消息和事务消息?

    不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。

  • 顺序消息支持哪些地域?

    支持消息队列RocketMQ版所有公共云地域和金融云地域。

  • 为什么全局顺序消息性能一般?

    全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。

  • 顺序消息支持哪种消息发送方式?

    顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。

  • 顺序消息是否支持集群消费和广播消费?

    顺序消息暂时仅支持集群消费模式,不支持广播消费模式。

Rocketmq的事务特性

Rocketmq一个重要的特性就是支持事务消息的。其实目前Kafka和Rabbitmq等也都支持。一般说支持事务都是指的半事务,即生产者和消息队列的事务,而不包括消费者。

下面是官网的一个事务示意图。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

下面的源码就是上面流程实现:

 public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        
        //获取自定义的事务listener
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // ignore DelayTimeLevel parameter
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
             //发送事务消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
         //根据消息队列响应结果执行本地事务
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            //根据本地事务执行状态,发送消息到消息队列,完成本次消息事务的commit或者rollback;
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

Rocketmq的底层通信

Rocketmq底层也是使用的Nettt框架通信,是在其基础上实现了自定义的通信协议。采用的模型为多Reactor多线程模型。具体可见: Java的NIO及Netty

通信流程就直接从官网上摘抄下来:

RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:

(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。

(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。

(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

其中用的比较多的就是NettyRemotingClient对象。

一个典型调用关系:

DefaultMqProducerImpl->MqClientInstance->MqClientApiImpl->NettyRemotingClient。

在Producer启动时,会start Netty客户端:

 @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

未完待续。。。

参考资料:

RocketMQ保证高可用和高性能的几种措施

Rocketmq官网

rocketmq 如何保证高可用_Kafka和RocketMQ如何保证高可用

Rocketmq扩容思路

消费者Rebalance机制

RocketMQ 实现高可用多副本架构的关键:基于 Raft 协议的 commitlog 存储库 DLedger

Rocketmq消费端负载均衡

--------EOF---------
微信分享/微信扫码阅读