1、什么是ZMQ
ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。
ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。
ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。
ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。
2、ZMQ的特点
1、 组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。
2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
5、ZMQ提供了多种模式进行消息路由,如请求-应答模式,发布-订阅模式等,这些模式可以用来搭建网络拓扑结构。
6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。
7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。
8、ZeroMQ传输的是消息,TCP传输字节。
3、ZMQ与TCP的区别
3.1、连接的区别
1、使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm。
2、当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定。
3、连接是异步的,并由一组消息队列做缓冲。
4、连接会表现出某种消息模式,这是由创建连接的套接字类型决定的。
5、一个套接字可以有多个输入和输出连接。
6、ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了。
7、应用程序无法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。
3.2、传输数据的区别
1、ZMQ套接字传输的是消息,而不是字节(TCP)或帧(UDP)。消息指的是一段指定长度的二进制数据块,这种设计是为了性能优化而考虑的,所以可能会比较难以理解。
2、ZMQ套接字在后台进行I/O操作,也就是说无论是接收还是发送消息,它都会先传送到一个本地的缓冲队列,这个内存队列的大小是可以配置的。
3、ZMQ套接字可以和多个套接字进行连接(如果套接字类型允许的话)。TCP协议只能进行点对点的连接,而ZMQ则可以进行一对多(类似于无线广播)、多对多(类似于邮局)、多对一(类似于信箱),当然也包括一对一的情况。
4、ZMQ套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇入模型)。
4、编译安装
4.1、安装依赖
sudo apt-get install libtool pkg-config build-essential autoconf automake
4.2、编译安装ZMQ使用的加密库
git clone git://github.com/jedisct1/libsodium.git
cd libsodium
./autogen.sh
./configure
make check
sudo make install
sudo ldconfig
cd ../
4.3、编译安装libzmq
git clone git://github.com/zeromq/libzmq.git
cd libzmq
./autogen.sh
./configure –with-libsodium
make
sudo make install
sudo ldconfig
cd ../
4.4、安装ZMQ的c库
添加编译选项 -lczmq -lzmq。
git clone git://github.com/zeromq/czmq.git
cd czmq
./autogen.sh
./configure && make check
sudo make install
sudo ldconfig
cd -
4.5、添加ZMQ的C++库
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # bootstrap-vcpkg.bat for Powershell
./vcpkg integrate install
./vcpkg install cppzmq
cd -
5、使用
5.1、cmake使用
cmake下使用,需要再CMakeList文件中添加如下内容:
#find cppzmq wrapper, installed by make of cppzmq
find_package(cppzmq)
target_link_libraries(*Your Project Name* cppzmq)
5.2、实例
C++开发可以参考如下实例代码进行开发工作: server端:
#include <zmq.hpp>
int main()
{
zmq::context_t ctx;
zmq::socket_t sock(ctx, zmq::socket_type::push);
sock.bind("inproc://test");
sock.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}
Client端:
#include <iostream>
#include <zmq_addon.hpp>
int main()
{
zmq::context_t ctx;
zmq::socket_t sock1(ctx, zmq::socket_type::push);
zmq::socket_t sock2(ctx, zmq::socket_type::pull);
sock1.bind("tcp://127.0.0.1:*");
const std::string last_endpoint =
sock1.get(zmq::sockopt::last_endpoint);
std::cout << "Connecting to "
<< last_endpoint << std::endl;
sock2.connect(last_endpoint);
std::array<zmq::const_buffer, 2> send_msgs = {
zmq::str_buffer("foo"),
zmq::str_buffer("bar!")
};
if (!zmq::send_multipart(sock1, send_msgs)) {
return 1;
}
std::vector<zmq::message_t> recv_msgs;
const auto ret = zmq::recv_multipart(
sock2, std::back_inserter(recv_msgs));
if (!ret) {
return 1;
}
std::cout << "Got " << *ret
<< " messages" << std::endl;
return 0;
}
在之前的示例中,主程序的循环体内会做以下几件事:
1、等待套接字的消息。
2、处理消息。
3、返回第一步。
6、zmq_poll()的使用
如果我们想要读取多个套接字中的消息呢?最简单的方法是将套接字连接到多个端点上,让ZMQ使用公平队列的机制来接受消息。如果不同端点上的套接字类型是一致的,那可以使用这种方法。但是,如果一个套接字的类型是PULL,另一个是PUB怎么办?如果现在开始混用套接字类型,那将来就没有可靠性可言了。
正确的方法应该是使用zmq_poll()函数。更好的方法是将zmq_poll()包装成一个框架,编写一个事件驱动的反应器,但这个就比较复杂了,我们这里暂不讨论。
我们先不使用zmq_poll(),而用NOBLOCK(非阻塞)的方式来实现从多个套接字读取消息的功能。下面将气象信息服务和并行处理这两个示例结合起来:
msreader: Multiple socket reader in C
#include "zhelpers.h"
int main (void)
{
// 准备上下文和套接字
void *context = zmq_init (1);
// 连接至任务分发器
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 连接至天气服务
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// 处理从两个套接字中接收到的消息
// 这里我们会优先处理从任务分发器接收到的消息
while (1) {
// 处理等待中的任务
int rc;
for (rc = 0; !rc; ) {
zmq_msg_t task;
zmq_msg_init (&task);
if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
// 处理任务
}
zmq_msg_close (&task);
}
// 处理等待中的气象更新
for (rc = 0; !rc; ) {
zmq_msg_t update;
zmq_msg_init (&update);
if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {
// 处理气象更新
}
zmq_msg_close (&update);
}
// 没有消息,等待1毫秒
s_sleep (1);
}
// 程序不会运行到这里,但还是做正确的退出清理工作
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}
这种方式的缺点之一是,在收到第一条消息之前会有1毫秒的延迟,这在高压力的程序中还是会构成问题的。此外,你还需要翻阅诸如nanosleep()的函数,不会造成循环次数的激增。
示例中将任务分发器的优先级提升了,你可以做一个改进,轮流处理消息,正如ZMQ内部做的公平队列机制一样。
下面,让我们看看如何用zmq_poll()来实现同样的功能:
mspoller: Multiple socket poller in C
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// 连接任务分发器
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// 连接气象更新服务
void *subscriber = zmq_socket (context, ZMQ_SUB);
zmq_connect (subscriber, "tcp://localhost:5556");
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);
// 初始化轮询对象
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
// 处理来自两个套接字的消息
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);
// 处理任务
zmq_msg_close (&message);
}
if (items [1].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (subscriber, &message, 0);
// 处理气象更新
zmq_msg_close (&message);
}
}
// 程序不会运行到这儿
zmq_close (receiver);
zmq_close (subscriber);
zmq_term (context);
return 0;
}