图2-1
接着,我们再启动服务端程序,发现客户端程序又开始正常运行了,连续发了 10 条消息后退出。这种现象似乎和 Socket API 的 TCP 通信行为不大相同,接下来我们来验证一下。我们使用 Socket API 来实现一个与前面相同的“请求-响应”模式的例子。下面是服务端代码(hwserver2.c),逻辑和之前的 hwserver.c 类似,监听 6666 端口,不停地接受、打印并返回信息,每次处理后停止 1 秒。
#include <stdio.h>#include <stdlib.h>然后是客户端代码(hwclient2.c),逻辑和之前的 hwclient.c 相同,向服务端连续发送 10 条消息,接受并打印返回信息。
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#define PORTNUM 6666
#define CONNMAX 5
#define BUFFSIZE 32
#define die(err) { perror(err); exit(1); }
void c_action(int sock)
{
char buffer[BUFFSIZE];
int received = -1;
char *send_s = "World";
// 接收消息
while ((received = recv(sock, buffer, BUFFSIZE, 0)) > 0) {
buffer[received] = 0;
printf ("Recv %s\n", buffer);
// 发送反馈
if (send(sock, send_s, received, 0) != received) {
die("failed to send");
}
printf ("Send %s\n", send_s);
sleep(1);
}
close(sock);
}
int main(void)
{
struct sockaddr_in s_addr, c_addr;
int s_sock, c_sock;
if ((s_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
die("failed to create socket");
}
memset(&s_addr, 0, sizeof(s_addr));
s_addr.sin_port = htons(PORTNUM);
s_addr.sin_family = AF_INET;
if (bind(s_sock, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0) {
die("failed to bind");
}
if (listen(s_sock, CONNMAX) < 0) {
die("failed to listen");
}
while (1) {
unsigned int c_addr_len = sizeof(c_addr);
if ((c_sock = accept(s_sock, (struct sockaddr *)&c_addr, &c_addr_len)) < 0) {
die("failed to accept");
}
c_action(c_sock);
}
close(s_sock);
return 0;
}
#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include <string.h>#include <arpa/inet.h>#define PORTNUM 6666#define BUFFSIZE 32#define die(err) { perror(err); exit(1); }int main(void){ int c_sock; struct sockaddr_in c_addr; char buffer[BUFFSIZE]; int msgcount, received; if ((c_sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { die("failed to create socket"); } memset(&c_addr, 0, sizeof(c_addr)); c_addr.sin_port = htons(PORTNUM); c_addr.sin_family = AF_INET; if (connect(c_sock, (struct sockaddr *)&c_addr, sizeof(c_addr)) < 0) { die("failed to connect"); } msgcount = 0; char *send_s = "Hello"; int len_i = strlen(send_s); while (msgcount < 10) { received = 0; // 发送消息 if (send(c_sock, send_s, len_i, 0) != len_i) { die("failed to send"); } printf("Send %s\n", send_s); // 接收反馈 while(received < len_i){ int bytes = 0; if ((bytes = recv(c_sock, buffer, BUFFSIZE-1, 0)) < 1) { die("failed to recv"); } received += bytes; buffer[bytes] = '\0'; printf("Recv %s\n", buffer); } msgcount++; } close(c_sock); return 0;}接着,我们也在未启动 hwserver2 的情况下运行 hwclient2,发现程序直接报错“Connection refused”的连接错误(如图2-2),说明连接不上服务器,这显然是符合常理的。相比之下,ZeroMQ 的表现就显得比较怪异了。因为我们在未启动 hwserver 的情况下运行 hwclient,发现程序并没有报连接错误,反而是在发送过一条消息之后阻塞住了(如图2-1);接着我们尝试启动 hwserver,发现 hwclient 又继续运行下去了,直至把 10 条消息发送完毕。
图2-2
TIP:关于 hwserver 和 hwclient 的代码请参考《ZeroMQ 深度探索(一)》
从以上现象可以看出,ZeroMQ 的 zmq_connect 方法其实只是建立了一个“虚连接”,和 Socket 的 connect 方法完全不同;实际上,从 ZeroMQ 的源码中也可以看出这点。起初我也感觉这个逻辑很奇怪,但实际上正因为有了这个特性,当我们使用 ZeroMQ 构建分布式系统的时候就不需要关心节点启动先后顺序的问题,为我们提供了不少便捷。但是,如果不善用这个特性极有可能导致严重的问题。比如,我们想使用 ZeroMQ 进行无状态模式发送,即类似于 HTTP 的“发送-接收-结束”的模式;假如在发送的过程中网络断线了,就会导致大量请求被阻塞住,严重者可导致服务器资源被耗尽!
如果要解决以上问题,一般的思路是设置超时,ZeroMQ 可以通过使用 zmq_poll 方法或者设置 ZMQ_LINGER 参数来设置请求超时,但是这也可能导致一些问题。超时时间设置太小容易丢失数据,设置太长又会影响运行效率,我们需要的是一个更可靠的网络通信方案。一种简单直接的方式就是对客户端程序进行改造,使之在不稳定的网络环境中也可以稳定运行,请参考以下代码实现。
#include <zmq.h>#include <stdio.h>#include <unistd.h>#include <string.h>#include <assert.h>int main (void){ // Socket to talk to clients void *context = zmq_ctx_new (); void *responder = zmq_socket (context, ZMQ_REP); int rc = zmq_bind (responder, "tcp://*:5555"); assert (rc == 0); char buffer [10]; char *send_s = "World"; while (1) { // 接收消息 zmq_recv (responder, buffer, 10, 0); buffer[5] = 0; printf ("Recv %s\n", buffer); // 发送反馈 zmq_send (responder, send_s, 5, 0); printf ("Send %s\n", send_s); sleep(1); } return 0;}首先是改造过的 ZeroMQ 的服务端代码(hwserver3.c),我们只是在原有代码(hwserver.c)上稍作修改,添加了获取到客户端请求之后返回“World”字符串的逻辑。
然后就是改造过的 ZeroMQ 的客户端代码(hwclient3.c)了,客户端的改动就大了,以下是其主要逻辑要点:
1、循环发送“Hello”字符串到服务端,然后接收返回的字符串“World”并打印出来。
2、当发现连不上服务端时,重试 3 次;如果仍然连不上,则主动结束客户端。
3、设置 ZMQ_LINGER 为 1 毫秒,表示连接不上,立即返回,不会阻塞。
#include <zmq.h>#include <string.h>#include <stdio.h>#include <unistd.h>#define SERVER_ENDPOINT "tcp://localhost:5555"#define REQUEST_TIMEOUT 3000 // msecs, (> 1000!)#define REQUEST_RETRIES 3 // retry before we abandonvoid *zmq_socket_new (void *context){ int linger = 1; void *zsocket = zmq_socket (context, ZMQ_REQ); zmq_setsockopt(zsocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_connect (zsocket, SERVER_ENDPOINT); return zsocket;}int main (void){ void *context = zmq_ctx_new (); void *zsocket = zmq_socket_new(context); char buffer [255]; char *send_s = "Hello"; int retries_left = REQUEST_RETRIES; while (retries_left) { // 发送消息 zmq_send (zsocket, send_s, strlen(send_s), 0); printf ("Send %s\n", send_s); // 重试次数 int expect_reply = 1; while (expect_reply) { // 停止重试 if (retries_left == 0) { printf("Server offline, abandoning ...\n"); break; } // 多路复用 zmq_pollitem_t items [] = { { zsocket, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, REQUEST_TIMEOUT); if (rc == -1) break; // Interrupted if (items [0].revents & ZMQ_POLLIN) { // 接收反馈 int size = zmq_recv (zsocket, buffer, 10, 0); if (size > 255) size = 255; buffer[size] = 0; printf ("Recv %s\n", buffer); if (buffer) { retries_left = REQUEST_RETRIES; expect_reply = 0; } } // 重试连接 else { printf("Retry connecting ...\n"); zmq_close (zsocket); zsocket = zmq_socket_new(context); // 重发消息 zmq_send (zsocket, send_s, strlen(send_s), 0); printf ("Send %s\n", send_s); --retries_left; } } } zmq_close (zsocket); zmq_ctx_destroy (context); return 0;}我们可以进行如下尝试,先启动服务端程序(hwserver3),然后再启动客户端程序(hwclient3),就可以看到客户端和服务端正在通信,客户端发送“Hello”,服务端反馈“World”,一切正常。接着我们停止服务端程序,我们马上发现客户端开始尝试重连(提示“Retry connecting ...”);接着我们马上重新打开服务端,就会发现客户端和服务端又恢复通信了;然后我们把服务端程序再次停止,我们看到客户端尝试重连 3 次之后,最终停止了(提示“Server offline, abandoning ...”),如图2-3所示。
图2-3
以上的设计模式被我们称之为“客户端信任”的模式,通过这种设计,我们建立了一个可控的、相对稳定的 C/S 通信模型。当然,从以上代码中我们也可以看到 ZeroMQ 中多路复用的用法,也就是 int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); 方法的使用,三个参数分别是 poll 项列表、poll 项个数以及 poll 超时时间(毫秒),其中 zmq_pollitem_t 的结构如下:
typedef struct
{
void //*socket//;
int //fd//;
short //events//;
short //revents//;
} zmq_pollitem_t;
此外,ZeroMQ 支持多种多路复用模式(参考源码 poller.hpp),列举如下:
1、select(支持unix/windows)
2、poll(支持unix)
3、epoll(支持linux)
4、kqueue(支持freebsd)
5、devpoll(zmq自研的poll)
其中,Linux 下默认使用的是 epoll 方式;当然,在编译的时候也可以通过 --with-poller 参数来配置所需的多路复用模式。话说回来,ZeroMQ 的网络通信模型和 Socket 还是有很多不同的,使用的时候一定要特别注意。在下篇中我们将介绍 ZeroMQ 消息的包装方式,进一步理解 ZeroMQ 网络通信的细节,学习其构建分布式系统的理念。