消息队列浅谈以及在小米的实践

本文目录:

1、消息队列介绍

2、常用的消息队列对比

3、小米的消息队列选型与实践

1、消息队列介绍

我最早真正使用消息队列应该是在2013年,那时候要使用Celery实现分布式任务,其内部集成了RabbitMQ,通过RabbitMQ来实现任务的发布和消费。后续又逐步接触了Kafka,RocketMQ,以及我们公司自研的Talos,Notify等等。

目前消息队列的应用比较广泛,主要得益于如下特性:

1、异步

不需要同步处理业务,可将任务放到队列中,消费者异步去处理。通过异步的方式可提高系统的整体性能、提高接口的响应时间,这对于有高吞吐量要求的系统来说是至关重要的。

2、解耦

通过消息队列将不同业务进行解耦,生产者和消费者相互独立,不必关心彼此的具体实现,相互也不会影响,只要按照两端约定的数据结构进行发送和消费即可。

3、流量削峰,限流缓解高并发

在高并发的场景下,将待处理任务放到队列中,避免瞬时有大量请求打到接口上。比如在秒杀中,经常会用到消息队列进行排队,缓解高并发压力。

目前市面上的消息队列中间件有很多,在选型时,主要是根据实际需求来选择,通常要考虑以下几个方面:

  • 消息顺序

消息顺序通常指的是消费者在消费消息时要完全按照发送消息的顺序消费。先发送的要保证先消费。这种场景在电商中比较常见。比如一个订单,可能要经过已支付,已出库,已妥投等状态流转。某些消费者要求必须严格按照正向的订单流转消息去消费,已出库不能在已支付的消息之前消费。我们可以看下Rocketmq顺序消息的实现。

生产顺序: 通过hashKey,将同一个key的消息一定发送到同一个队列上。每个队列的消息都是追加的,一定是按序存储的。不过在生产时,必须采用同步方式发送,否则异步的话不能保证一定是按序发送。

消费顺序:Rocketmq保证在集群模式下,一个消息只能被同一个Consumer Group下的一个Consumer消费;广播模式下,就算是同一个group,也有可能被多个Consumer消费。

所以如果想实现顺序消费,一个基本前提就是一定是集群模式。

  • 消息重复/消息幂等

消息重复指的是消息是否回重复发送或者重复消费。也就是说,在生产者和消费者两端都可能产生消息重复。目前的消息队列很少有保证不重复的,即At Most once,通常是需要我们消费者去保证幂等性。即使相同的消息多次发送,消费者都应该有能力保证相同的消息只会处理一次。

  • 消息丢失

消息丢失是指消息投递后,消费者还没来得及消费,消息就因为某种原因丢失了。目前市面的消息队列都能支持At least Once,可保证消息能够正常生产发送方到消息队列中,且在消费者消费之前不会丢失。

当然,诸如RocketMQ所谓的保证At least Once,也并不是百分百的,因为数据首先回先写入到PageCache中的,如果恰好整个集群都在落盘之前宕机,就会出现丢失。当然这个发生的概率非常小。对,非常小,但不是没可能。

  • 消息事务

我们通常所说的消息事务包括两种,一是实现分布式事务消息(本地事务执行成功,消息成功投递);二是多个消息同时发送,要么都成功,要么都失败。RocketMQ实现的是前者,通过消息分布式事务能力,可以确保生产者只有成功执行本地事务,才能够将消息发送到消息队列,并对消费者可见。目前提供这种能力的消息队列比较少,而RocketMQ提供了这样的能力,非常适用于分布式系统。这个地方可以看下RocketMQ官网对于半事务的介绍,我贴一下官网的示意图:

RokectMQ事务消息

Kafka实现的是后者,即保证多个消息同时发送时,要么都成功,要么都失败,其内部通过事务协调器来实现。下面是一张示意图:

  • 消息延迟

延迟强调的是延迟消费,Producer生产消息发送到消息队列后,不是立即对消费者可见,而是延迟一段时间之后,消费者才可消费。在电商中这种场景也是很常见的。比如下单后半个小时或一个小时之后还未支付的话就执行关单操作。目前很多消息队列都支持,比如RocketMQ,DDMQ,QMQ等,Kafka的并不对外提供这种能力,但内部有自用的定时任务,是通过时间轮实现的。

时间轮

数据结构是有一个环形队列,每个队列中是一个链表(单向或者双向的)。指针会按照一定的频率跳动,

比如一秒跳动一次,整个环形队列可能有60个slot。每移动一次到某个slot,就会将slot种的链表中的所有任务执行一遍。如果此时有新的定时任务出现,就会放到指针的前一个slot中。当然放到具体是那个slot,还要看定时任务具体的时间。比如要放到定时是2秒的,就放到距当前slot两个距离的节点中。

那现在有一个问题就是如果一个定时任务的时间已经超过了整个wheel的间距了,该怎么办呢?

针对该问题,还有一个分级时间轮的概念。每一个级时间轮的间距不断增大。

一个定时任务,会根据具体的定时时间决定添加到哪一个层级的时间轮中。

RocketMQ延迟消息

开源版本中,RocketMQ划分了18个Level,给每个level都设置了定时器。当发送的消息是个延迟消息时,首先会将消息发送到特殊的队列,topic是 SCHEDULE_TOPIC_XXXX,queueId是延迟级别减去1,并将tagsCode设置为延迟时间。 随后定时器会扫描对应级别的延迟队列,拿延迟时间和当前时间做比对,如果小于等于当前时间,就会将对应消息发送到正常的消息队列中。

默认延迟级别 :1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。 level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

感兴趣的可以看下这篇文章,介绍了不同的延迟消息的实现方案:
https://www.51cto.com/article/709743.html

  • 消费优先级

通过设置消息优先级,可以实现某些消息可以优先被消费。说到这个功能,只有在生产者生产消息的速度远远大于消费者消费消息的速度才有意义。RabbitMQ对消息优先级的支持比较好,他通过设置不同优先级的队列来实现。

  • 消费持久化

持久化是一个很重要的功能,如果消息队列的server宕机了,可以方便恢复当前所有生产的消息以及消费者消费消息进度等信息。目前几乎所有的消息队列都是支持持久化的,算得上是基本能力。不过持久化的存在势必会在一定程度上影响系统性能,因为写入磁盘是一件相对比较耗时的操作,不过这是一个可以优化的过程,比如通过异步、顺序读写来提升。

在kafka以及RocketMQ中的持久化具有较高的性能,主要得益于其使用了零拷贝、PageCache以及顺序IO等机制。

1、顺序IO

Kafka通过每次写数据追加到文件的末尾,保证顺序的写入。kafka会将每个partition进行分段,每段都是一个segment,文件分为.index和.log文件,分别记录了索引数据和文件数据。.index记录的是二元组。 比如 (5,102),表示在数据文件第5个消息,用基础偏移量,就是文件名的值+5就是全局的offset,其偏移地址是102.

和Kafka不同,RocketMQ是所有消息都在一个commitLog中,不过commitLog有大小限制,超过1G就要用新的commitLog,不过新的commitLog也是提前预热创建的。

2、PageCache

由于磁盘操作时一个比较耗时的操作,CPU更多的时间都是在等待。因此操作引入了一个磁盘缓存的概念,主要包括PageCache和BufferCache。PageCache主要是强调的对文件的缓存。

当Broker接收到数据后,它是先将数据写入到PageCache中,并不是直接写入到磁盘文件上,这就大大减少了操作时间,提升了性能。然后隔一段时间后,内核线程再将PageCache数据刷到磁盘上。

那么问题来了,当机器down机,且还没有来得及将数据刷到磁盘文件中,数据会丢失吗?

在极端情况下,的确是会丢的。在Kafka中,如果想保证不丢失,就使用同步刷盘的机制,但这样会严重影响整个系统的性能。可以通过减小刷盘间隔来尽量避免数据丢失。现在我们公司采用的Rocketmq也是采用异步刷盘的机制,通过使用分布式集群,保证只要有一个broker不掉线,PageCache就不会丢失。

通过直接写内存,保证了写的性能。如果读的速度和写的速度相当,那么读的操作也可以直接从PageCache获取。这种不经过磁盘文件的操作是其高性能的重要原因之一。

3、零拷贝

之前我写了一篇文章叫 Linux零拷贝技术浅谈 ,已经详细地介绍了零拷贝的技术,这里说一下消息队列的应用。

Kafka在写入时使用了mmap的方式,在发送时使用了sendfile。它也是直接调用了JAV API transferTo,最终调用操作系统的sendfile。

然而Rocketmq是只使用了mmap+write的方式。主要是考虑通常情况都是小数据传输,而sendfile只是将文件描述符拷贝到缓冲区,只有通过阻塞的方式完成数据文件发送。mmap可以使用异步方式发送。

JAVA的mmap通过MapperByteBuffer实现的。现在模仿一个写操作:

  public void writeByMap(){
        Path path = Paths.get("/home/haibo/t.py");
        String haibo = "dwdwdeghrhyu5u6hbheu68i7jnbsft37u8ihewt8u";
        byte[] bytes = haibo.getBytes(StandardCharsets.UTF_8);
        try{
            FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING);
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,bytes.length);
            if (mappedByteBuffer != null){
                mappedByteBuffer.put(bytes);
                mappedByteBuffer.force();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

通过strace -c -tt javac IOPra 看一下系统调用:

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 20.88    0.000753          18        41        29 openat
 17.91    0.000646          14        45           mmap
 14.50    0.000523         523         1           execve
 10.68    0.000385          16        24        21 stat
 10.29    0.000371          23        16           mprotect
  3.52    0.000127          15         8           pread64
  3.13    0.000113          56         2           munmap
  3.00    0.000108           9        12           fstat
  2.41    0.000087           7        12           close
  2.38    0.000086          43         2           readlink
  2.33    0.000084           7        11           read
  1.97    0.000071          71         1           clone
  1.83    0.000066          22         3         2 access
  1.22    0.000044          14         3           brk
  0.80    0.000029          14         2         1 arch_prctl
  0.75    0.000027          13         2           getpid
  0.69    0.000025          12         2           rt_sigaction
  0.50    0.000018          18         1           set_tid_address
  0.50    0.000018          18         1           set_robust_list
  0.39    0.000014          14         1           prlimit64
  0.31    0.000011          11         1           rt_sigprocmask
  0.00    0.000000           0         1           futex
------ ----------- ----------- --------- --------- ----------------
  • 消息回溯

消息回溯是指当消费者消费完消息之后,是否还可以回溯找到对应的消息。消息回溯的场景适用于重试以及对账等业务。

2、常用的消息队列对比

现如今,市面上的消息队列中间件琳琅满目,至少我都接触了有5,6,7,8种了。不敢说他们在重复造轮子,只能说大家侧重点不同,都是在某些特定场景下才开发的。下面几个是比较常用的消息队列:

  1. Kafka 几乎所有人都听过了,apache孵化项目,较早的一款消息队列,很多消息队列都是参考kafka开发的。
  2. RabbitMQ 使用Erlang语言开发的,对很多来讲,维护和二次开发不是特别友好。
  3. RocketMQ 阿里开源 apache孵化项目,分开源和商业两个版本
  4. ActiveMQ apache项目
  5. DDMQ   滴滴开源 我司的延迟就参考了DDMQ。
  6. QMQ 去哪儿网,可称道的地方是使用多级时间轮实现了任意时间延迟
  7. NSQ Golang开发的消息队列
  8. ............

以上并未完全列出现有的消息队列,现在就三种我接触过的消息队列进行对比,通过消息队列的不同特性进行多维度比较。

RocketMQ

Kafka

Rabbitmq

消息重复/幂等

不保证消息重复,需要在消费端自行实现。

早期版本不保证消息重复,需要在消费端自行实现。自从0.11.0.0之后,可实现生产消息不重复,Exactly Once。

不保证消息重复,需要在消费端自行实现

消息丢失

可以保证at least once ,因此不会丢失。

在生产端,只有broker回复SUCCESS,生产端才人为投递成功;

在消费端,只有回传SUCCESS给broker,才被认为消费成功。

当然,发送模式应该是同步活异步,如果发送模式选择oneway则不保证。

但有一种情况可能出现丢失:broker集群收到消息了,但是采用异步刷盘的话,可能在落盘之前,整个集群都挂掉了,当然发生概率会很低。

同理,也可保证消息不丢失。其实RocketMQ也是参考kafka来实现的。同样在生产消息以及消费消息的过程都会有确认机制。

可以保证不丢失

但是 队列和消息都要配置持久化,否则只在内存中,还是有可能丢失的。

消息持久化

支持,持久化到文件

支持,持久化到文件

需要配置队列和消息的持久化。

消息优先级 不支持 不支持 支持

消息顺序

在同一个逻辑队列中,可以实现消息的顺序。全局也可以,但是效率较差。

当使用顺序消息时,消费端同样要改成顺序消费,其默认是并发消费,即会同时开多个线程并发消费。

同一个partition下的可以实现消息顺序。

同理,将要顺序消费的消息放在一个Queue

消息延迟

支持延迟,延迟实现的方式(开源版)是设置不同级别的延迟队列,队列中通过定时去处理到期消息,其共有18个级别的定时。

在到时之前投递在一个专门的TOPIC下。

不支持。但内部通过时间轮实现了定时任务,不对外提供相应功能。

本身不支持,但可通过延迟插件或者死信队列方式实现。可以访问下面的参考资料

消息回溯

由于最后文件是写到磁盘文件,是支持消息回溯的。但过期删除的查看不了,一般上可配置7天或一个月。

支持回溯,支持按照offset和timestamp两种维度进行回溯。

不支持,当消息被消费之后,就会被删除。

消息事务

支持在生产者和broker之间的半事务。具体流程可参考官网。简单说就是通过双方的commit,以及事务状态回查来实现。

在事务完全提交之前,是个半事务消息,也不会消费。

自0.11开始,kafka就支持了事务。但这个事务和RocketMQ的事务不是一回事,RocketMQ强调分布式事务的场景。Kafka强调的是一次发多个消息的事务特性。即多个消息要么都成功,要么都失败。

虽然Rabbitmq事务机制,但感觉其并非为了支持分布式事务,而只是一种支持At least once的保证机制。

消费重试

集群模式下,如果未成功消费,可以重试,超过一定次数进入死信队列;

广播模式下,不支持重试。

不支持。和RocketMQ不同,kafka不需要消费者提供消费ack,而且不自动维护偏移量,只是提供偏移量更新接口。不过可以根据这个特性实现自动重试。

支持。默认支持消费端的确认机制。

消费模式

本质上都是PULL模式。其push也是长轮询进行pull。

采用pull模式,由消费者自行决定消费频率和消费数量。

主要是Push,其也有pull,但性能较差

消费分组 支持消费分组 支持消费分组 不支持,一个消息只能被一个消费者消费

高可用

设置NameServer集群,每个节点都会存储全部的路由和topic信息。

broker集群:多master+多slave

且 broker有专门负责master到slave的同步

集群部署,早期使用ZK管理。后期自实现KRaft进行集群管理。

有主队列和镜像队列,可以保证高可用。

高性能

通过顺序IO、零拷贝、PageCache、分区、索引文件等多种方式来实现高可用。

并发消费等。

通过顺序IO、零拷贝、PageCache、分区、索引文件等多种方式来实现高可用。

性能一般,主要其采用Push模式

3、小米的消息队列选型与实践

小米的消息队列采用了很多种, Notify,Kafka,EMQ,RabbitMQ,Talos,RocketMQ,MQTT等。我们并不是为了使用而使用,主要还是因为部门较多,不同部门业务不同,侧重点不同,因此在选型消息队列时就会根据实际业务需求去选择更合适的消息队列。

1、RabbitMQ的使用

在小米使用RabbitMQ最典型的场景是邮件中继。在2021年,公司要求所有部门自动发送邮件时不可直连邮件服务器,而是要经过一个邮件中继Guard,其最主要的目的是让Guard做一层治理,比如流量管控,避免大量请求都打到邮件服务器。如果某个大神程序没写好,每个订单都发送一封邮件,一天几十万订单,可是要了命了。Guard内部就使用了RabbitMQ,业务系统将邮件发送到Guard,Guard会将所有邮件作为消息头递到消息队列,随后RabbitMQ负责将消息push到消费者(邮件服务器),在上面有过介绍,push消息的速率完全由broker掌握,我们可以控制Push消息的频率,类似大家经常说的限流,消峰,这就可以有效避免邮件服务器被瞬时大量邮件打垮。

不过也正是因为这点,经常会出现消息堆积,邮件发不出去的场景。如果某个业务发送了大量邮件,可能会影响到其他业务的邮件发送。如何做到相互不影响,也是Guard团队重点要解决的问题。

2、RocketMQ

RocketMQ也是这几年才在小米大规模使用的,尤其是电商系统,像有品和小米商城,目前只用RocketMQ作为实际业务的消息队列,日志会用到talos。之前的有品使用过原生的Kafka,也使用过公司自研的Notify。Notify是我个人非常不认同的一个中间件,其内部使用了Mysql和Redis,作为消息队列,非常不成熟,其架构类似如下:

后期经过调研,我们最终都换成了RocketMQ,主要是考虑到RocketMQ的几个特性:

  • 事务写入
  • Key级别顺序消息,可以用订单号作为hashkey
  • 重试 ,支持不同模式的重试(顺序消费时默认无限重试,并发消费可间隔重试16次数)
  • 死信队列 当达到一定消费次数之后,就直接进入死信队列,方便后续手动触发。

小米RocketMQ发展轨迹:

我们使用的是开源的RocketMQ,众所周知,其只支持固定级别的延迟,为了实现任意时间的延迟,小米云团队参考了DDMQ的经验借助RocksDB和时间轮以插件的形式无侵入地支持了任意时间的延迟。

此外,针对于Pull模式的缺点,也做了改进。由于RocketMQ要求一个分区只能同时被一个消费者消费(同组),因此当消费者数量大于分区数量时,多出的消费者是不能进行消费的,这无疑是一种浪费。因此针对这点,小米进行了优化。优化如下:

RocketMQ在现阶段是小米业务系统的不二选择。

3、Talos

Talos是小米自研的一个消息队列,已经比较早了,设计它的初衷是因为当时使用的是Kafka 0.8 版,当时版本的Kafka自身存在很多缺点,比如集群扩容和故障恢复时非常麻烦。

Talos实际上也是参考Kafka进行开发的,其主要变化:

1、将存储和结算相分离。存储采用HDFS,TalosServer只负责调度;

2、采用一致性hash实现负载均衡。

Talos架构

据Talos团队介绍,目前Talos实现了:

  • 日处理消息数超过 2 万亿条,日消息峰值 4 千万条/秒,日处理数据量 1.3PB;
  • Topic 总数 13000+,下游的作业数 15000+,接入业务数量 350+ ;

其实,Talos还实现了Exactly Once,这是我觉得比较好的一点(当然,新版本的kafka也已经实现了)。基本思想是在生产者生产一个序列号,同样的序列号,使得broker不会再继续处理;在消费端,先查redis缓存,看是否已经处理同样的序列号,如果已处理,就不再继续处理。

关于Talos这部分,我建议大家看下面参考资料中的小米消息队列的实践,里面详解介绍了talos的由来,架构和特点,非常赞。

参考资料:

消息队列——延时消息应用解析及实践-阿里云开发者社区

阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台

千与千寻-浅谈Kafka以及Rocketmq的高性能

万亿级消息背后: 小米消息队列的实践_云计算_勇幸_InfoQ精选文章

--------EOF---------
微信分享/微信扫码阅读