结合Go聊聊nsq
今天学习了一下nsq,以及在Go中如何使用nsq,做一下总结。
之前使用过RabbitMQ,是因为当初在我自己的网站上,我希望能够异步执行一些任务,然后选用了celery,celery默认使用的消息队列就是RabbitMQ。最近,因为学习Go,就接触了比较牛的nsq。
1、NSQ介绍
看过NSQ的人,可能都会见到这么一段话:
NSQ是一个基于Go语言的分布式实时消息平台。
NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
要了解nsq,就要了解nsq几个重要的组件:nsqlookupd,nsqd以及nsqadmin.
下面先放出它们的官方定义:
nsqd:一个负责接收、排队、转发消息到客户端的守护进程
nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
1、nsqlookupd
nsqlookupd用来管理拓扑信息,客户端可以通过其查询某个topic的生产者,比如消费者连接到nsqlookupd;此外,nsqd节点要广播其topic和channel信息给lookupd,我的理解是相当于注册到nsqlookupd上,从而消费者可以发现它。当我们有多个独立的nsqd上的时候,需要将多个nsqd节点都连到nsqlookupd上。
nsqlookupd有两个端口,一个TCP端口用于nsqd节点广播(默认4160),另一个HTTP端口用于执行发现和管理操作,比如查询某个topic的生产者等。
我现在启动一个nsqlookupd:
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
2、nsqd
nsqd是nsq比较重要的一个组件,也可以说是核心,它会监听某个端口上发过来的二进制消息,并将topic等信息广播出去,比如生产者向其端口发送消息;此外,每当有nsqd启动时,它会自动向nsqlookupd进行注册。通常,nsqd之间是独立的,互不干扰的,一般生产者都会连接到本地的nsqd。
当然,消费者也可以直接连到nsqd上,读取消息。但通常我们都不会直接让worker直接连接到nsqd上,而是通过nsqlookupd。
现在启动一个nsqd:
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
如上,该nsqd节点是连上了nsqlookupd。
好,现在结合Go分析一下nsq的工作流程。流程图:
1、首先本地的生产者向本地nsqd节点发布消息。在该过程中,会建立一个长链接,并发布带有topic消息题的Pub命令。生产者实例代码(使用go-nsq):
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
//创建一个生产者。地址为nsqd的一个节点,端口为nsqd的TCP端口。
p,err := nsq.NewProducer("172.17.64.119:4150",config)
if err != nil {
fmt.Println(err)
return
}
err = p.Ping()
if err != nil {
fmt.Println("ping: ",err)
return
}
pubName := "testnsqpub"
defer p.Stop()
for i:=0;i<3;i++{
// 发布topic为pubName的消息。
err = p.Publish(pubName,[]byte("test async:haibo,please trust yourself"))
if err != nil {
fmt.Println("err in pub",err)
}else {
fmt.Println("sending success!")
}
}
}
2、随后,该topic的消息副本会被发送到所有的channel上,供各个worker消费。
3、只要连接上nsqd的消费者就可以消费某个topic的信息。实例代码:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
)
type Consumer struct {
}
// NewConsumer的AddHandle方法接收的是一个接口,该接口含有HandleMessage方法,并返回error。
func (c *Consumer) HandleMessage(message *nsq.Message) error {
fmt.Println("msg:",string(message.Body))
return nil
}
func main() {
config := nsq.NewConfig()
//创建了一个消费者,topic是testnsqpub,channel是test_channel.
consumer,err := nsq.NewConsumer("testnsqpub","test_channel",config)
if err!=nil {
fmt.Println("err when new consumer:",err)
}
consumer.AddHandler(&Consumer{})
//连到nsqlookupd上,而不是直接连接到nsqd上。这事先了动态的需求。
if err:= consumer.ConnectToNSQLookupd("172.17.64.119:4161");err != nil {
fmt.Println("error when connect Lookupd")
} else {
fmt.Println("connect to lookupd success")
}
<- consumer.StopChan
}
现在如果运行consumer和producer就会得到我想要的结果。
我认为到目前,肯定还是很有些云里雾里的,channel是用来做什么的?topic是干什么的?至少我学习的时候是这样的。
- Topic:它是消息发布的key,key-value,哈哈。通过topic来找到对应的消息。初次发布消息的时候,如果topic不存在,就会创建;
这里可以拿出NSQD的源码,它是一个结构体。
type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
.......
topicMap map[string]*Topic
......
}
NSQD结构体包含了一个Map,topicmap。
2.channel:channel主要和消费者有关,当我们创建消费者之后,会带上channel信息,即接收哪个channel的消息。消费者第一次订阅的时候会创建channel。那假如现在某每个nsqd有多个订阅的消费者,每次发布消息的时候,都会将消息副本发送到所有的channel上。channel对消息进行排队,类似队列,如果没有消费者读取的话,就存在内存中。消费者会在某一个channel上读取该消息。
微信分享/微信扫码阅读