nsq 源码分析-1
前言
前几周在学习 raft,但由于 raft 需要耗费的周期比较长,打算等到大四了再好好学一学论文和啃一啃 raft 源码(其实我已经把 raft 的 log 层、log 存储、process 等源码啃的差不多了,剩下最难啃的 raft 层和 node 层了。。。),最近不搞这个了,先认真备战秋招。
这两周学习 nsq,是因为之前在公司用到消息队列的场景还挺多的(客户端打点,日志等之类的都会用到),而我基本没学过 java,看不了 kafka,所以把用 go 编写的 nsq 拿来啃一啃。
我看的是 nsq 的早期版本,想借此来循序渐进。感兴趣的可以在这里下载:nsq-github
1 | tree --dirsfirst -L 1 -I '*test*' -P '*.go' |
通过命令可以看到一共 13 个文件。下面就通过一个消息从消费者通过 http->nsq->consummer 讲解一条消息在 nsq 中的移动过程。下一张详细讲解每个文件中主要函数的作用。敬请期待~
msg 移动过程
首先说明一点,nsg 中只要涉及到 channel 的,都是协程和协程之间的通信。nsq 启动时会启动很多协程的。
start
首先,消费者通过调用 http 接口进入代码:
获取到传入的参数后,通过调用 GetTopic 获取到此 topicName 对应的 topic。
GetTopic 这个函数写的还是比较有趣的,在新建了一个 interface 型的变量之后,将 topicName 和 topicChan 传给 newTopicChan 后等待此 interface 变量发送返回的消息。
这个是 newTopic 的定义:
newTopic 接收到消息后,查看这个 topicName 是否在 topicMap 中,如果存在,取出 topic 给上面的 channel 发送消息,如果不在,将新建 key-value 并存入 map 中,并将此 topic 传给的 channel。
获取到 topic 后返回,调用topic.PutMessage(NewMessage(buf.Bytes()))
,将 msg 传给 topic 的 incomingMessageChan。
接下来会在这里调用:
如果执行的是 default,则放入 topic 对应的 queue 中(topic 的 queue 是消息过多而设置的,channel 中存的是每一个消费者需要消费的消息,两者有着本质的区别,虽然底层 queue 代码一样),然后存起来。等到合适的时机在从队列取出,然后将消息存入 channel 中的 queue 中(和下面执行几乎一样的代码)。
否则将消息送入 msgChan:
将 msg 存入 channel 对应的 queue 中:
如果是 default 类似于上面的,存入 channel 的 queue 中(然后等适当时机从下图的 ReadReadyChan 出来),否则进入 msgchan:
执行c.RequeueMessage(UuidToStr(msg.Uuid()))
:
将消息取出后又放回去。是将消息放入队列末尾了,因为 nsq 并不像其他消息队列一样需要保证消息的顺序,这也是 nsq 速度快的一点原因。
但是正如《隐秘的角落一样》,这样一直循环是不是毫无意义,那么是在什么时候发送给的消费者?
答案就在 nsq/channel 的 GetMessage 中:
这个函数执行完之后会 break,向上查找可知是 protocol 调用:
但是这个 GET 函数并没有被调用过,此时联想到 go 的反射机制不难推出答案:
消费者在 IOLoop 中通过 body 发现客户端的 cmd 是 get 请求,通过反射调用 get 函数后,等待消息的到来。
还有两个细节需要注意一下:
- 生产者发送时的格式为:http 头 | body,http 头中有 topic 信息,body 中的都是对应的消息。
- 发送者发送的消息格式为:版本号(4字节,区分唯一的 Protocol),cmd\r\n cmd\r\n。
- 发送者接受的消息格式应该为:uuid(唯一区分 msg 用的) + body。
至此流程基本推导完毕。