83374290 2019-11-04
本文原地址 https://blog.lpflpf.cn/passages/nsqd-study-2/
NSQ 消息队列实现消息落地使用的是 FIFO 队列。
实现为 diskqueue , 使用包github.com/nsqio/go-diskqueue
,本文主要对diskqueue
的实现做介绍。
本文代码来自于github.com/nsqio/go-diskqueue
实现的功能是一个FIFO的队列,实现如下功能:
diskqueue 是 BackendQueue (一个队列所需要的接口) 的实现, 接口定义如下:
type BackendQueue interface { Put([]byte) error // 将一条消息插入到队列中 ReadChan() chan []byte // 返回一个无缓冲的chan Close() error // 队列关闭 Delete() error // 删除队列 (实际在实现时,数据仍保留) Depth() int64 // 返回读延迟的消息量 Empty() error // 清空消息 (实际会删除所有的记录文件) }
对于需要原子操作的64bit 的字段,需要放在struct 的最前面,原因请看学习总结第一条
数据结构中定义了 文件的读写位置、一些文件读写的控制变量,以及相关操作的channel.
// diskQueue implements a filesystem backed FIFO queue type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 // 读的位置 writePos int64 // 写的位置 readFileNum int64 // 读文件的编号 writeFileNum int64 // 写文件的编号 depth int64 // 读写文件的距离 (用于标识队列的长度) sync.RWMutex // instantiation time metadata name string // 标识队列名称,用于落地文件名的前缀 dataPath string // 落地文件的路径 maxBytesPerFile int64 // 每个文件最大字节数 minMsgSize int32 // 单条消息的最小大小 maxMsgSize int32 // 单挑消息的最大大小 syncEvery int64 // 每写多少次刷盘一次 syncTimeout time.Duration // 至少多久会刷盘一次 exitFlag int32 // 退出标识 needSync bool // 如果 needSync 为true, 则需要fsync刷新metadata 数据 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 // 下一次读的位置 nextReadFileNum int64 // 下一次读的文件number readFile *os.File // 读 fd writeFile *os.File // 写 fd reader *bufio.Reader // 读 buffer writeBuf bytes.Buffer // 写 buffer // exposed via ReadChan() readChan chan []byte // 读channel // internal channels writeChan chan []byte // 写 channel writeResponseChan chan error // 同步写完之后的 response emptyChan chan int // 清空文件的channel emptyResponseChan chan error // 同步清空文件后的channel exitChan chan int // 退出channel exitSyncChan chan int // 退出命令同步等待channel logf AppLogFunc // 日志句柄 }
初始化一个队列,需要定义前缀名, 数据路径,每个文件的最大字节数,消息最大最小限制,以及刷盘频次和最长刷盘时间,最后还有一个日志函数
func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { d := diskQueue{ name: name, dataPath: dataPath, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), emptyResponseChan: make(chan error), exitChan: make(chan int), exitSyncChan: make(chan int), syncEvery: syncEvery, syncTimeout: syncTimeout, logf: logf, } // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) } go d.ioLoop() return &d }
可以看出, 队列中均使用不带cache 的chan,消息只能阻塞处理。
d.retrieveMetaData()
是从文件中恢复元数据。
d.ioLoop()
是队列的事件处理逻辑,后文详细解答
文件名 "name" + .diskqueue.%06d.dat
其中, name 是 topic, 或者topic + channel 命名.
数据采用二进制方式存储, 消息大小+ body 的形式存储。
ioLoop 函数,做所有时间处理的操作,包括:
func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 var r chan []byte // 定时器的设置 syncTicker := time.NewTicker(d.syncTimeout) for { // 若到达刷盘频次,标记等待刷盘 if count == d.syncEvery { d.needSync = true } if d.needSync { err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } count = 0 } // 有可读数据,并且当前读chan的数据已经被读走,则读取下一条数据 if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } r = d.readChan } else { // 如果无可读数据,那么设置 r 为nil, 防止将dataRead数据重复传入readChan中 r = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed // 如果读chan 被写入成功,则会修改读的偏移 d.moveForward() case <-d.emptyChan: // 清空所有文件,并返回empty的结果 d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: // 写msg count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: // 到刷盘时间,则修改needSync = true if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: goto exit } } exit: d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1 }
需要注意的点:
文件名: "name" + .diskqueue.meta.dat
其中, name 是 topic, 或者topic + channel 命名.
metadata 数据包含5个字段, 内容如下:
depth\nreadFileNum,readPos\nwriteFileNum,writePos
当服务关闭后,metadata 数据将保存在文件中。当服务再次启动时,将从文件中将相关数据恢复到内存中。
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
原因 原因在golang 源码 sync/atomic/doc.go 中
// On ARM, x86-32, and 32-bit MIPS, // it is the caller's responsibility to arrange for 64-bit // alignment of 64-bit words accessed atomically. The first word in a // variable or in an allocated struct, array, or slice can be relied upon to be // 64-bit aligned.
buffer_pool.go
文件中, 简单实现了 bytes.Buffer 的对象池,减少了gc 压力