nsqd源码学习
今天学习了一下NSQD的源码,大致了解了其工作流程。
在上一篇:结合Go聊聊nsq 介绍了nsq,知道了nsqd是接收,转发消息的进程,那他是如何接收,转发消息的呢?
首先看一下NSQD的字段:
type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
sync.RWMutex
opts atomic.Value
dl *dirlock.DirLock
isLoading int32
errValue atomic.Value
startTime time.Time
topicMap map[string]*Topic
lookupPeers atomic.Value
tcpListener net.Listener
httpListener net.Listener
httpsListener net.Listener
tlsConfig *tls.Config
poolSize int
notifyChan chan interface{}
optsNotificationChan chan struct{}
exitChan chan int
waitGroup util.WaitGroupWrapper
ci *clusterinfo.ClusterInfo
}
我认为最最重要的有几个字段:
- topicMap 从字面就能发现,它维护了一个Topic的Map;
- tcplistener 这是最重要的了,是nsqd和外界客户端沟通的桥梁。所有消息的传递都是要经过该tcplistener;
- httplistener,提供了http的api访问,该server使用了httprouter路由。
- waitGroup,协程同步。nsq作者自己写了一个封装的用法,我感觉挺好的。如下代码:
一个同步的用法:
import "sync"
type WaitGroup struct {
sync.WaitGroup
}
func (w *WaitGroup) Wrap(cb func()) {
w.Add(1)
go func() {
defer w.Done()
cb()
}()
}
// 应用
w := WaitGroup{}
w.Wrap(func () {...})
好,现在就从nsqd接收到一个消息开始,到消费者接收截止说说。
1、首先tcplistener不断监听。
func (p *tcpServer) Handle(clientConn net.Conn) {
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
protocolMagic := string(buf)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
return
}
//prot是一个Protocol interface类型。
err = prot.IOLoop(clientConn)
}
上面最重要的就是IOLoop方法。它的作用是根据读取的内容,执行对应的方法。如Pub,Sub等命令。
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
line, err = client.Reader.ReadSlice('\n')
line = line[:len(line)-1]
params := bytes.Split(line, separatorBytes)
var response []byte
response, err = p.Exec(client, params)
}
}
Exec函数就是根据传入的参数,来选择对应的方法,比如生产者执行了发布消息操作,那就会执行对应的Pub命令。那Exec内部肯定是含有switch....case语句的。
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
.....
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
}
}
现在nsqd收到了pub消息,那它首先是会根据topicName获取topic,如果没有的话就会新建一个Topic。
topic := p.ctx.nsqd.GetTopic(topicName)
func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first.
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
}
n.Lock()
t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
}
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t
把消息推送到消息队列。首先它要推送到内存消息队列,如果满了,就将其推送到定义的磁盘队列。
现在Pub的消息都存在了消息队列中,此时其实还有一个协程,叫messagePump,它的主要作用是获取消息通道内的消息,然后发送给客户端。此外也用于发送心跳信息(用来确保连接正常)。
好,一切准备就绪,现在我就等消费者来消费我的信息了。
现在有一个client来订阅我了,并且发送了一个sub消息,该消息中含有channel名称,还有topicName。根据其内容,nsqd的messagePump就会读取对应的信息,并发送给客户端。
这个channel也是如果没有,就新建一个channel。
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
topic := p.ctx.nsqd.GetTopic(topicName)
channel := topic.GetChannel(channelName)
channel.AddClient(client.ID, client)
client.Channel = channel
// update message pump
client.SubEventChan <- channel //开始消费该channel中的消息
return okBytes, nil
}
客户端收到nsqd发送的消息之后,是要发送确认消息的,是带有FIN的消息。
不知道说没说清楚,上面就是简单的流程。后续还得继续深入研究一下。
--------EOF---------
微信分享/微信扫码阅读
微信分享/微信扫码阅读