diskQueue
是backendQueue
接口的一个实现。backendQueue
的作用是在实现在内存go channel缓冲区满的情况下对消息的处理的对象。
除了diskQueue
外还有dummyBackendQueue
实现了backendQueue
接口。
对于临时(#ephemeral结尾)Topic/Channel,在创建时会使用dummyBackendQueue
初始化backend
, dummyBackendQueue
只是为了统一临时和非临时Topic/Channel而写的,它只是实现了接口,不做任何实质上的操作,
因此在内存缓冲区满时直接丢弃消息。这也是临时Topic/Channel和非临时的一个比较大的差别。
每个非临时Topic/Channel,创建的时候使用diskQueue
初始化backend
,diskQueue
的功能是将消息写入磁盘进行持久化,
并在需要时从中取出消息重新向客户端投递。
diskQueue
的实现在nsqd/disk_queue.go
中。需要注意一点,查找diskQueue
中的函数的调用可能不会返回正确的结果,
因为diskQueue
对外是以backendQueue
形式存在,因此查找diskQueue
的函数的调用情况时应当查找backendQueue
中相应函数的调用。
diskQueue
的创建和初始化
diskQueue
的获得是通过newDiskQueue
,该函数比较简单,通过传入的参数创建一个dispQueue
,
然后通过retrieveMetaData
函数获取之前与该diskQueue
相关联的Topic/Channel已经持久化的信息。最后启动ioLoop
循环处理消息。
retrieveMetaData
函数从磁盘中恢复diskQueue
的状态。diskQueue
会定时将自己的状态备份到文件中,
文件名由metaDataFileName
函数确定。retrieveMetaData
函数同样通过metaDataFileName
函数获得保存状态的文件名并打开。
该文件只有三行,格式为%d\n%d,%d\n%d,%d\n
,第一行保存着该diskQueue
中消息的数量(depth
),
第二行保存readFileNum
和readPos
,第三行保存writeFileNum
和writePos
。
这里不太理解的一个地方是d.depth
通过一个临时变量去获取然后通过atomic.StoreInt64
保存。个人觉得没有必要这么做。
当然作者在nsqd: diskqueue corruption and depth accounting这个Pull Request中也提到:
I dont believe that this should be strictly necessary because
retrieveMetaData
is only ever called inNewDiskQueue
and theioLoop
goroutine is launched after that call (which according to the go memory model is safe).However, I’m not 100% sure about interactions between the go memory model, go-routines, and the combined use of atomic and non-atomic operations (which is what this was relying on before this change… i.e. this was the only mutation of
d.depth
that was notusing atomic ops).
因此,这只是个比较保险的做法,并不一定意味着直接保存到d.depth
就不安全。
与retrieveMetaData
相对应的是persistMetaData
函数,这个函数将运行时的元数据保存到文件用于下次重新构建diskQueue
时的恢复。
逻辑基本与retrieveMetaData
,此处不再赘述。
diskQueue
的消息循环
ioLoop
函数实现了diskQueue
的消息循环,diskQueue
的定时操作和读写操作的核心都在这个函数中完成。
函数首先使用time.NewTicker(d.syncTimeout)
定义了syncTicker
变量,syncTicker
的类型是time.Ticker
,
每隔d.syncTimeout
时间就会在syncTicker.C
这个go channel产生一个消息。
通过select syncTicker.C
能实现至多d.syncTimeout
时间就跳出select块一次,这种方式相当于一个延时的default
子句。
在ioLoop
中,通过这种方式,就能在一个goroutine中既实现消息的接收又实现定时任务(跳出select后执行定时任务,然后在进入select)。
有点类似于定时的轮询。
ioLoop
的定时任务是调用sync
函数刷新文件,防止突然结束程序后内存中的内容未被提交到磁盘,导致内容丢失。
控制是否需要同步的变量是d.needSync
,该变量在一次sync
后会被置为false
,在许多需要刷新文件的地方会被置为true
。
在ioLoop
中,d.needSync
变量还跟刷新计数器count
变量有关,count
值的变化规则如下:
- 如果一次消息循环中,有写入操作,那么
count
就会被自增。 - 当
count
达到d.syncEvery
时,会将count
重置为0并且将d.needSync
置为true
,随后进行文件的刷新。 - 在
emptyChan
收到消息时,count
会被重置为0,因为文件已经被删除了,所有要重置刷新计数器。 - 在
syncTicker.C
收到消息后,会将count
重置为0,并且将d.needSync
置为true
。也就是至多d.syncTimeout
时间刷新一次文件。
ioLoop
还定时检测当前是否有数据需要被读取,如果(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos)
和`d.nextReadPos == d.readPos
这两个条件成立,则执行d.readOne()
并将结果放入dataRead
中,然后设置r
为d.readChan
。
如果条件不成立,则将r
置为空值nil
。随后的select语句中有case r <- dataRead:
这样一个分支,在注释中作者写了这是一个Golang的特性,
即:如果r
不为空,则会将dataRead
送入go channel。进入d.readChan
的消息通过ReadChan
函数向外暴露,最终被Topic/Channel的消息循环读取。
而如果r
为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。
diskQueue
的写操作
写操作的对外接口是Put
函数,该函数比较简单,加锁,并且将数据放入d.writeChan
,等待d.writeResponseChan
的结果后返回。 d.writeChan
的接收在ioLoop
中select的一个分支,处理时调用writeOne
函数,并将处理结果放入d.writeResponseChan
。
writeOne
函数是写操作的最终执行部分,负责将消息写入磁盘。函数逻辑比较简单。消息写入步骤如下:
- 若当前要写的文件不存在,则通过
d.fileName(d.writeFileNum)
获得文件名,并创建文件 - 根据
d.writePos
定位本次写的位置 - 从要写入的内容得到要写入的长度
- 先写入3中计算出的消息长度(4字节),然后写入消息本身
- 将
d.writePos
后移4 + 消息长度
作为下次写入位置。加4是因为消息长度本身也占4字节。 - 判断
d.writePos
是否大于每个文件的最大字节数d.maxBytesPerFile
,如果是,则将d.writeFileNum
加1,
并重置d.writePos
。这个操作的目的是为了防止单个文件过大。 - 如果下次要写入新的文件,那么需要调用
sync
函数对当前文件进行同步。
diskQueue
的读操作
消息读取对外暴露的是一个go channel,而数据的最终来源是ioLoop
中调用的readOne
函数。readOne
函数逻辑跟writeOne
类似,
只是把写操作换成了读操作,唯一差异较大的地方是d.nextReadPos
和d.nextReadFileNum
这两个变量的使用。
在写操作时,如果写入成功,则可以直接将写入位置和写入文件更新。但是对于读操作来说,由于读取的目的是为了向客户端投递,
因此无法保证一定能投递成功。因此需要使用next开头的两个变量来保存成功后需要读的位置,如果投递没有成功,
则继续使用当前的读取位置将再一次尝试将消息投递给客户端。
当消息投递成功后,则使用moveForward
函数将保存在d.nextReadPos
和d.nextReadFileNum
中的值取出,
赋值给d.readPos
和d.readFileNum
,moveForward
函数还负责清理已经读完的旧文件。最后,调用checkTailCorruption
函数检查文件是否有错,
如果出现错误,则调用skipToNextRWFile
重置读取和写入的文件编号和位置。
diskQueue
中的其他函数
diskQueue
中还有与错误处理相关的handleReadError
,与关闭diskQueue
相关的Close
,Delete
,exit
,Empty
和deleteAllFiles
等,
函数,逻辑较简单,不再专门分析。
diskQueue
总结
diskQueue
主要逻辑是对磁盘的读写操作,较为琐碎但没有复杂的架构。
其中消息循环的思路和读写过程周全的考虑都值得学习的。
nsq源码阅读笔记之nsqd(三)——diskQueue的更多相关文章
-
nsq源码阅读笔记之nsqd(四)——Channel
与Channel相关的代码主要位于nsqd/channel.go, nsqd/nsqd.go中. Channel与Topic的关系 Channel是消费者订阅特定Topic的一种抽象.对于发往Topi ...
-
nsq源码阅读笔记之nsqd(一)——nsqd的配置解析和初始化
配置解析 nsqd的主函数位于apps/nsqd.go中的main函数 首先main函数调用nsqFlagset和Parse进行命令行参数集初始化, 然后判断version参数是否存在,若存在,则打印 ...
-
nsq源码阅读笔记之nsqd(二)——Topic
与Topic相关的代码主要位于nsqd/nsqd.go, nsqd/topic.go中. Topic的获取 Topic通过GetTopic函数获取 GetTopic函数用于获取topic对象,首先先尝 ...
-
CI框架源码阅读笔记5 基准测试 BenchMark.php
上一篇博客(CI框架源码阅读笔记4 引导文件CodeIgniter.php)中,我们已经看到:CI中核心流程的核心功能都是由不同的组件来完成的.这些组件类似于一个一个单独的模块,不同的模块完成不同的功 ...
-
CI框架源码阅读笔记2 一切的入口 index.php
上一节(CI框架源码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程,这里再次贴出流程图,以备参考: 作为CI框架的入口文件,源码阅读,自然由此开始.在源码阅读的过程中, ...
-
源码阅读笔记 - 1 MSVC2015中的std::sort
大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格 ...
-
libevent源码阅读笔记(一):libevent对epoll的封装
title: libevent源码阅读笔记(一):libevent对epoll的封装 最近开始阅读网络库libevent的源码,阅读源码之前,大致看了张亮写的几篇博文(libevent源码深度剖析 h ...
-
jdk源码阅读笔记-LinkedHashMap
Map是Java collection framework 中重要的组成部分,特别是HashMap是在我们在日常的开发的过程中使用的最多的一个集合.但是遗憾的是,存放在HashMap中元素都是无序的, ...
-
faster rcnn源码阅读笔记1
自己保存的源码阅读笔记哈 faster rcnn 的主要识别过程(粗略) (开始填坑了): 一张3通道,1600*1600图像输入中,经过特征提取网络,得到100*100*512的feature ma ...
随机推荐
-
Discuz! X论坛上传附件到100%自动取消上传的原因及解决方案
最近接到一些站长的反馈,说论坛上传附件,到100%的时候自己取消上传了.经查是附件索引表pre_forum_attachment表的aid字段自增值出现了问题,导致程序逻辑返回的aid值实际为一个My ...
-
BADI
BADI_MATERIAL_CHECK BOM_UPDATE MB_MIGO_BADI ME_GUI_PO_CUST ME_PROCESS_PO_CUST ME_REQ_POSTED WORKORDE ...
-
iOS RSA加密解密及签名验证
1.首先要下载openssl,这个不用说,直接官网下载或者用brew install openssl下载 2.终端生成私钥密钥 2.1生成私钥 openssl genrsa - 2.2生成密钥 ope ...
-
MySQL引擎介绍ISAM,MyISAM,HEAP,InnoDB
MySQL数据库引擎取决于MySQL在安装的时候是如何被编译的.要添加一个新的引擎,就必须重新编译MYSQL. 在缺省情况下,MYSQL支持三个引擎:ISAM.MYISAM和HEAP.另外两种类型IN ...
-
spring-oauth-server实践:授权方式三:PASSWORD模式下 authorities:ROLE_{user.privillege}, ROLE_USER
一.数据库配置 1.oauth_client_details 2.user_ 3.user_privillege 二.password模式 授权过程 1.授权者granter和请求参数 Resourc ...
-
从零开始学spring cloud(七) -------- Spring Cloud OpenFegin
一.OpenFegin 介绍 Feign是一个声明性的Web服务客户端. 它使编写Web服务客户端变得更容易. 要使用Feign,请创建一个界面并对其进行注释. 它具有可插入的注释支持,包括Feign ...
-
C#中的readonly跟const用法小结
总结一下常量和只读字段的区别: 由来: 笔者也是在看欧立奇版的<.Net 程序员面试宝典>的时候,才发现自己长久以来竟然在弄不清出两者的情况下,混用了这么长的时间.的确,const与rea ...
-
键值对Dictionary、KeyValuePair、Hashtable 简单使用。
KeyValuePair是单个的键值对对象.KeyValuePair可用于接收combox选定的值. 例如:KeyValuePair<string, object> par = (KeyV ...
-
腾讯云ubuntu安装Mysql并配置远程访问
转载请注明原文地址:http://www.cnblogs.com/ygj0930/p/6378914.html 一:修改SSH配置 输入 su 进入root模式.修改ssh配置: sudo vi /e ...
-
IDEA使用技巧:CamelCasePlugin插件
CamelCasePlugin是一款可以快速进行格式转换的工具,较常用到的是大小写转换.驼峰式转换等. 1.打开idea,然后打开设置.点击Plugins 2.快捷键shift+alt+u