nsqd源码学习

今天学习了一下NSQD的源码,大致了解了其工作流程。

在上一篇:结合Go聊聊nsq 介绍了nsq,知道了nsqd是接收,转发消息的进程,那他是如何接收,转发消息的呢?

首先看一下NSQD的字段:

type NSQD struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	clientIDSequence int64

	sync.RWMutex

	opts atomic.Value

	dl        *dirlock.DirLock
	isLoading int32
	errValue  atomic.Value
	startTime time.Time

	topicMap map[string]*Topic

	lookupPeers atomic.Value

	tcpListener   net.Listener
	httpListener  net.Listener
	httpsListener net.Listener
	tlsConfig     *tls.Config

	poolSize int

	notifyChan           chan interface{}
	optsNotificationChan chan struct{}
	exitChan             chan int
	waitGroup            util.WaitGroupWrapper

	ci *clusterinfo.ClusterInfo
}

我认为最最重要的有几个字段:

  • topicMap   从字面就能发现,它维护了一个Topic的Map;
  • tcplistener  这是最重要的了,是nsqd和外界客户端沟通的桥梁。所有消息的传递都是要经过该tcplistener;
  • httplistener,提供了http的api访问,该server使用了httprouter路由。
  • waitGroup,协程同步。nsq作者自己写了一个封装的用法,我感觉挺好的。如下代码:

一个同步的用法:

import "sync"

type WaitGroup struct {
	sync.WaitGroup
}

func (w *WaitGroup) Wrap(cb func()) {
	w.Add(1)
	go func() {
		defer w.Done()
		cb()
	}()
}

// 应用
w := WaitGroup{}
w.Wrap(func () {...})

好,现在就从nsqd接收到一个消息开始,到消费者接收截止说说。

1、首先tcplistener不断监听。

func (p *tcpServer) Handle(clientConn net.Conn) {
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	protocolMagic := string(buf)
	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{ctx: p.ctx}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		return
	}
        //prot是一个Protocol interface类型。
	err = prot.IOLoop(clientConn)
	
}

上面最重要的就是IOLoop方法。它的作用是根据读取的内容,执行对应的方法。如Pub,Sub等命令。

func (p *protocolV2) IOLoop(conn net.Conn) error {
	var err error
	var line []byte
	var zeroTime time.Time
	clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
	client := newClientV2(clientID, conn, p.ctx)
	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan
	for {
		line, err = client.Reader.ReadSlice('\n')
		line = line[:len(line)-1]
		params := bytes.Split(line, separatorBytes)
		var response []byte
		response, err = p.Exec(client, params)
        }
}

Exec函数就是根据传入的参数,来选择对应的方法,比如生产者执行了发布消息操作,那就会执行对应的Pub命令。那Exec内部肯定是含有switch....case语句的。

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	.....
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
        }
}

现在nsqd收到了pub消息,那它首先是会根据topicName获取topic,如果没有的话就会新建一个Topic。

topic := p.ctx.nsqd.GetTopic(topicName)
func (n *NSQD) GetTopic(topicName string) *Topic {
	// most likely, we already have this topic, so try read lock first.
	n.RLock()
	t, ok := n.topicMap[topicName]
	n.RUnlock()
	if ok {
		return t
	}

	n.Lock()

	t, ok = n.topicMap[topicName]
	if ok {
		n.Unlock()
		return t
	}
	deleteCallback := func(t *Topic) {
		n.DeleteExistingTopic(t.name)
	}
	t = NewTopic(topicName, &context{n}, deleteCallback)
	n.topicMap[topicName] = t

把消息推送到消息队列。首先它要推送到内存消息队列,如果满了,就将其推送到定义的磁盘队列。

现在Pub的消息都存在了消息队列中,此时其实还有一个协程,叫messagePump,它的主要作用是获取消息通道内的消息,然后发送给客户端。此外也用于发送心跳信息(用来确保连接正常)。

好,一切准备就绪,现在我就等消费者来消费我的信息了。

现在有一个client来订阅我了,并且发送了一个sub消息,该消息中含有channel名称,还有topicName。根据其内容,nsqd的messagePump就会读取对应的信息,并发送给客户端。

这个channel也是如果没有,就新建一个channel。

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	topic := p.ctx.nsqd.GetTopic(topicName)
	channel := topic.GetChannel(channelName)
	channel.AddClient(client.ID, client)
	client.Channel = channel
	// update message pump
	client.SubEventChan <- channel //开始消费该channel中的消息
	return okBytes, nil
}

客户端收到nsqd发送的消息之后,是要发送确认消息的,是带有FIN的消息。

 

不知道说没说清楚,上面就是简单的流程。后续还得继续深入研究一下。

 

--------EOF---------
微信分享/微信扫码阅读