前言
介绍
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.0-rc5。本文档是对4.0.0-rc5分支代码进行分析。
目的
对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:
- 消息队列NetMQ 原理分析1-Context和ZObject
- 消息队列NetMQ 原理分析2-IO线程和完成端口
- 消息队列NetMQ 原理分析3-命令产生/处理和回收线程
- 消息队列NetMQ 原理分析4-Session和Pipe
- 消息队列NetMQ 原理分析5-Engine
- 消息队列NetMQ 原理分析6-TCP和Inpoc实现
- 消息队列NetMQ 原理分析7-Device
- 消息队列NetMQ 原理分析8-不同类型的Socket
- 消息队列NetMQ 原理分析9-实战
友情提示: 看本系列文章时最好获取源码,更有助于理解。
IO线程
NetMQ 4.0.0底层使用的是IOCP(即完成端口)模式进行通信的(3.3.4使用的是select模型),通过异步IO绑定到完成端口,来最大限度的提高性能。这里不对同步/异步socket进行详细介绍。稍微解释下完成端口,为了解决每个socket客户端使用一个线程进行通信的性能问题,完成端口它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能。
想详细了解完成端口的请看完成端口(Completion Port)详解,讲解的比较详细,同时对各种网络编程模型做了简单的介绍。
因此NetMQ通过几个(默认1个)IO线程处理通信,上一片文章介绍了ZObejct对象,在该对象中存在许多命令的处理,实际对命令的发送,分配都是IO线程的工作。
初始化IO线程
IO线程初始化时会初始化Proactor
和IOThreadMailbox
var name = "iothread-" + threadId;
m_proactor = new Proactor(name);
m_mailbox = new IOThreadMailbox(name, m_proactor, this);
Proactor
对象就是用来绑定或处理完成端口用的,后面再做作详细介绍。IOThreadMailbox
是IO线程处理的信箱,每当有命令需要处理时,都会向当前Socket
对象所在的IO线程信箱发送命令。
让我们看一眼IOThread
对象和IOThreadMailbox
的定义
internal sealed class IOThread : ZObject, IMailboxEvent
{
}
IOThread
对象继承自ZObject
对象,记得上一节想到ZObject对象知道如何处理各种命令吗?因此IOThread
对象也继承了他父亲的技能。同时IOThread
对象实现了IMailboxEvent
接口,这个接口之定义了一个方法。
internal interface IMailboxEvent
{
void Ready();
}
当IO信箱接受到命令时表示当前有命令准备好了,可以进行 处理,IO信箱则会调用IO线程的Ready方法处理命令,那么IO信息如何调用IO线程的Ready方法呢,来看下IOThreadMailbox
的构造函数。
internal class IOThreadMailbox : IMailbox
{
...
public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent)
{
m_proactor = proactor;
m_mailboxEvent = mailboxEvent;
Command cmd;
bool ok = m_commandPipe.TryRead(out cmd);
}
...
}
在IOThreadMailbox初始化时,传入了IMailboxEvent。
m_commandPipe是NetMQ的管道(Pipe),后面我们会对其做介绍,这里只要知道该管道用于存放命令即可,可以__暂时__理解为管道队列。
Proactor
每个IOThread
会有一个Proactor
,Proactor
的工作就是将Socket
对象绑定到完成端口,然后定时去扫描完成端口是否有需要处理的Socket
对象。
internal class Proactor : PollerBase
{
...
public Proactor([NotNull] string name)
{
m_name = name;
m_stopping = false;
m_stopped = false;
m_completionPort = CompletionPort.Create();
m_sockets = new Dictionary<AsyncSocket, Item>();
}
...
}
Proactor
对象继承自PollerBase
,那么PollerBase
又是什么呢?从命名可以看这是一个轮询基类,即该对象需要长时间不断循环处理某件事情。PollerBase
对象是一个抽象类,它有2个功能:
-
负载均衡
还记的Context中选择IO线程时有这个一段代码吗?
IO线程的负载均衡功能就是PollBase对象提供的每次选择IO线程时会将
m_load
字段值+1protected void AdjustLoad(int amount)
{
Interlocked.Add(ref m_load, amount);
}public int Load
{
get
{
#if NETSTANDARD1_3
return Volatile.Read(ref m_load);
#else
Thread.MemoryBarrier();
return m_load;
#endif
}
}在
IOThread
取PollBase对象(Proactor)的Load属性时候会特殊处理,保证拿到的是最新的值。 -
定时任务
PollBase第二个功能就是支持定时任务,即定时触发某事件。private readonly SortedList<long, List<TimerInfo>> m_timers;
PollBase内部有一个
SortedList
,key为任务执行的时间,value为TimeInfo
。TimeInfo
对象包含2个信息,id
和ITimerEvent
接口,id
用来辨别当前任务的类型,ITimerEvent
接口就包含了TimerEvent
方法,即如何执行。
如TcpConnection
连接失败会重新连接时会重连,下面时TcpConnection
开始连接方法private void StartConnecting()
{
Debug.Assert(m_s == null); // Create the socket.
try
{
m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
}
catch (SocketException)
{
AddReconnectTimer();
return;
}
...
}
private void AddReconnectTimer()
{
//获取重连时间间隔
int rcIvl = GetNewReconnectIvl();
//IO线程的Proactor中,TcpConnection的ReconnectTimerId = 1
m_ioObject.AddTimer(rcIvl, ReconnectTimerId);
...
}IO线程会被封装到
IOObject
中,调用IOObject
的AddTimer
方法实际就是调用IO线程中Proactor
对象的AddTimer
方法,其方法定义如下public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
{
long expiration = Clock.NowMs() + timeout;
var info = new TimerInfo(sink, id); if (!m_timers.ContainsKey(expiration))
m_timers.Add(expiration, new List<TimerInfo>()); m_timers[expiration].Add(info);
}第一行会获取当前的毫秒时间加上时间间隔。然后加入到
m_timers
中。
m_completionPort = CompletionPort.Create();
m_sockets = new Dictionary<AsyncSocket, Item>();
初始化时会创建完成端口,当有socket需要处理时会和完成端口绑定。
初始化时还会初始化一个存放异步AsyncSocket
和item
的字典。
有关于AsyncSocket
和CompletionPort
可以去Git上看AsyncIO的源码,这里不做分析。Item
结构如下
private class Item
{
public Item([NotNull] IProactorEvents proactorEvents)
{
ProactorEvents = proactorEvents;
Cancelled = false;
}
[NotNull]
public IProactorEvents ProactorEvents { get; }
public bool Cancelled { get; set; }
}
它包含了IProactorEvents
接口的信息和当前Socket
操作是否被取消标志。
internal interface IProactorEvents : ITimerEvent
{
void InCompleted(SocketError socketError, int bytesTransferred);
void OutCompleted(SocketError socketError, int bytesTransferred);
}
IProactorEvents
继承自ITimerEvent
。同时它还声明了InCompleted
和OutCompleted
方法,即发送或接收完成时如何处理,因此当需要处理Socket
时,会将当前Socket
处理方式保存到这个字典中。当当前对象发送消息完成,则会调用OutCompleted
方法,接收完成时则会调用InCompleted
方法。
当有Socket
需要绑定时会调用Proactor
的AddSocket
方法
public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
{
var item = new Item(proactorEvents);
m_sockets.Add(socket, item);
m_completionPort.AssociateSocket(socket, item);
AdjustLoad(1);
}
它包含2个参数,一个时异步Socket
对象和IProactorEvents
。然后加把他们加入到字段中并将他们绑定到完成端口上。第四段AdjustLoad
方法即把当前IO线程处理数量+1,用于负载均衡用。
当Socket
操作完成时会调用Proactor
的RemoveSocket
移除绑定
public void RemoveSocket(AsyncSocket socket)
{
AdjustLoad(-1);
var item = m_sockets[socket];
m_sockets.Remove(socket);
item.Cancelled = true;
}
移除时会将item
的Cancelled
字段设置为true
。所以当Proactor
轮询处理Socket
时发现该Socket
操作被取消(移除),就会跳过处理。
启动Procator线程轮询
在IO线程启动时实际就是启动Procator的work线程
public void Start()
{
m_proactor.Start();
}
public void Start()
{
m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };
m_worker.Start();
}
处理socket
完整的Loop
方法如下
private void Loop()
{
var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
while (!m_stopping)
{
// Execute any due timers.
int timeout = ExecuteTimers();
int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
continue;
for (int i = 0; i < removed; i++)
{
try
{
if (completionStatuses[i].OperationType == OperationType.Signal)
{
var mailbox = (IOThreadMailbox)completionStatuses[i].State;
mailbox.RaiseEvent();
}
// if the state is null we just ignore the completion status
else if (completionStatuses[i].State != null)
{
var item = (Item)completionStatuses[i].State;
if (!item.Cancelled)
{
switch (completionStatuses[i].OperationType)
{
case OperationType.Accept:
case OperationType.Receive:
item.ProactorEvents.InCompleted(
completionStatuses[i].SocketError,
completionStatuses[i].BytesTransferred);
break;
case OperationType.Connect:
case OperationType.Disconnect:
case OperationType.Send:
item.ProactorEvents.OutCompleted(
completionStatuses[i].SocketError,
completionStatuses[i].BytesTransferred);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
catch (TerminatingException)
{ }
}
}
}
var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
第一行初始化了CompletionStatus
数组,CompletionStatusArraySize
值为100。CompletionStatus
作用是用来保存socket的信息或状态。
获取超时时间
int timeout = ExecuteTimers();
protected int ExecuteTimers()
{
if (m_timers.Count == 0)
return 0;
long current = Clock.NowMs();
var keys = m_timers.Keys;
for (int i = 0; i < keys.Count; i++)
{
var key = keys[i];
if (key > current)
{
return (int)(key - current);
}
var timers = m_timers[key];
foreach (var timer in timers)
{
timer.Sink.TimerEvent(timer.Id);
}
timers.Clear();
m_timers.Remove(key);
i--;
}
return 0;
}
ExecuteTimers
会计算之前加入到m_timers
需要等待的超时时间,若没有对象则直接返回0,否则获取若获取到key时间在当前时间之前,则需要调用TimerEvent
方法,调用完成后移除。
若获取到的key时间比当前时间大,则返回他们的差即为需要等待的超时时间。
从完成端口获取处理完的状态
int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
continue;
GetMultipleQueuedCompletionStatus
方法传入一个超时时间,若前面获取的超时时间为0,则这边会设置为-1,表示阻断直到有要处理的才返回。CompletionPort
内部维护了一个状态队列,removed
即为处理完成返回的状态个数。
若获取成功则会返回true
,后面就开始遍历completionStatuses
数组处理完成Socket
。
开始处理待处理的状态
public struct CompletionStatus
{
internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) :
this()
{
AsyncSocket = asyncSocket;
State = state;
OperationType = operationType;
SocketError = socketError;
BytesTransferred = bytesTransferred;
}
public AsyncSocket AsyncSocket { get; private set; }
public object State { get; internal set; }
public OperationType OperationType { get; internal set; }
public SocketError SocketError { get; internal set; }
public int BytesTransferred { get; internal set; }
}
CompletionStatus
是个结构体,它包含的信息如上。其中OperationType
是当前Socket
的处理方式。
public enum OperationType
{
Send, Receive, Accept, Connect, Disconnect, Signal
}
在for
循环的一开始先会判断当前状态的OperationType
,若是Signal,则说明当前是个信号状态,说明有命令需要处理,则会调用IO信箱的RaiseEvent
方法,实际为IO线程的Ready
方法。
public void Ready()
{
Command command;
while (m_mailbox.TryRecv(out command))
command.Destination.ProcessCommand(command);
}
IOThread
会将当前信箱的所有命令进行处理。
若不是Signal
则会将CompletionStatus
保存的状态信息转换为Item
对象,并判断当前Socket
是否移除(取消)。若没有则对其进行处理。判断OperationType
,若为Accept
或Receive
则表示需要接收,则调用InCompleted
方法。若为Connect
,Disconnect
或Send
则表示有消息向外发送,则调用OutCompleted
方法。
至此IOThread
代码分析完毕。
IOObject
internal class IOObject : IProactorEvents
{
public IOObject([CanBeNull] IOThread ioThread)
{
if (ioThread != null)
Plug(ioThread);
}
public void Plug([NotNull] IOThread ioThread)
{
Debug.Assert(ioThread != null);
m_ioThread = ioThread;
}
}
IOObject
实际就是保存了IOThread
的信息和Socket
处理完成时如何执行,以及向外暴露了一些接口。
再次说明,如果向简单了解完成端口如何使用,则看《完成端口使用》,如果想详细了解完成端口则看下《完成端口详细介绍》,如果想直到NetMQ的AsyncIO和完成端口的源码请看AsyncIO。
总结
该篇介绍了IO线程和完成端口的处理方式,若哪里分析的不到位或有误希望支出。
微信扫一扫二维码关注订阅号杰哥技术分享
本文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
作者博客:杰哥很忙
欢迎转载,请在明显位置给出出处及链接)
消息队列NetMQ 原理分析2-IO线程和完成端口的更多相关文章
-
消息队列NetMQ 原理分析3-命令产生/处理和回收线程
消息队列NetMQ 原理分析3-命令产生/处理和回收线程 前言 介绍 目的 命令 命令结构 命令产生 命令处理 创建Socket(SocketBase) 创建连接 创建绑定 回收线程 释放Socket ...
-
消息队列NetMQ 原理分析1-Context和ZObject
前言 介绍 NetMQ是ZeroMQ的C#移植版本,它是对标准socket接口的扩展.它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问. 当前有2个版本正在维护,版本3 ...
-
消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
消息队列NetMQ 原理分析4-Socket.Session.Option和Pipe 前言 介绍 目的 Socket 接口实现 内部结构 Session Option Pipe YPipe Msg Y ...
-
消息队列NetMQ 原理分析5-StreamEngine、Encord和Decord
消息队列NetMQ 原理分析5-StreamEngine,Encord和Decord 前言 介绍 目的 StreamEngine 发送数据 接收数据 流程分析 Encoder V2Encoder V1 ...
-
Netty构建分布式消息队列实现原理浅析
在本人的上一篇博客文章:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇 中,重点向大家介绍了AvatarMQ主要构成模块以及目前存在的优缺点.最后以一个生产者.消费者传递消息的例子, ...
-
PHP消息队列用法实例分析
这篇文章主要介绍了PHP消息队列用法,结合实例形式分析了PHP消息队列用于Linux下进程间通信的相关技巧,需要的朋友可以参考下 该消息队列用于linux下,进程通信 队列状态信息:具体参考手册
-
redis作为消息队列的原理
Redis队列功能介绍 List 转:https://blog.csdn.net/cestlavieqiang/article/details/84197736 常用命令: Blpop删除,并获得该列 ...
-
Rabbimq必备基础之对高级消息队列协议AMQP分析及Rabbitmq本质介绍
MQ的一个产品... [消息队列] 1. MSMQ windows自带的一个服务... [petshop],message存放在文件系统中. 最原始的消息队列... [集群,消息确认,内存化,高可用, ...
-
自制MFC消息响应定位器+原理分析
mfc里面有张消息映射表(MESSAGE_MAP),消息都是通过这张表来分发到相应函数里的. 这个是我自制的定位器,从vc6.0到现在的2013生成的mfc都可以用,全静态扫描并已处理动态基址. 下面 ...
随机推荐
-
lucene/solr 修改评分规则方法总结
说明:由于solr底层使用的是lucene,因此修改solr打分机制归根结底还是依赖于lucene的打分机制,本文主要讨论lucene的打分机制. 本文说明lucene 常用的四种影响评分结果的方式. ...
-
学习Sass之安装Sass(一)
为什么使用Sass 作为前端(html.javascript.css)的三大马车之一的css,一直以静态语言存在,HTML5火遍大江南北了.javascript由于NODE.JS而成为目前前后端统一开 ...
-
WebMethod属性详解
WebMethod有6个属性:.Description.EnableSession.MessageName.TransactionOption.CacheDuration.BufferResponse ...
-
[Angular 2] Use Service use Typescript
When creating a service, need to inject the sercive into the bootstrap(): import {bootstrap, Compone ...
-
禁用Java DNS缓存-Disable DNS caching
Once an application has performed network access (i.e. urlconnection, parsing of xml document with e ...
-
给VIM和Terminal配色:Solarized
给VIM和Terminal配色:Solarized 最近在学习使用VIM.我选择Solarized配色.相信很多人也都在用. 官网地址: http://ethanschoonover.com/sola ...
-
使用wmic.exe绕过应用程序白名单(多种方法)
一.Wmic.exe wmic实用程序是一款Microsoft工具,它提供一个wmi命令行界面,用于本地和远程计算机的各种管理功能,以及wmic查询,例如系统设置.停止进程和本地或远程运行脚本.因 ...
-
Linux提权exp大全
如下表 #CVE #Description #Kernels CVE-2017-1000367 [Sudo] (Sudo 1.8.6p7 - 1.8.20) CVE-2017-7494 [Samba ...
-
set -o vi AIX下shell
set -o vi 再用esc+K键就可以使用上一条指令了 esc+kesc+j上下翻 ksh默认是emacs风格的.set -o emacs 在AIX下使用自己已经使用过的命令 在AIX下使用,默认 ...
-
关于verilog中小数直接赋值
verilog中小数直接赋值的话小数会近似成1,如0.1,0.6,0.9赋值的话就会变成1,5.1,5.9也都会变成6.并且quartus默认小数是64位.