看了有三四天的的源码,(当然没怎么好好看了),突然发现对mosquitto的源码有了一点点感觉,于是在第五天决定在Linux环境下部署mosquitto。
使用传统源码安装步骤:
步骤1:http://mosquitto.org/files/source/官网下载源码,放到Linux环境中。解压后,找到主要配置文件config.mk,其中包含mosquitto的安装选项,需要注意的是,默认情况下mosquitto的安装需要OpenSSL(一个强大的安全套接字层密码库)的支持,若不需要SSL,则需要关闭config.mk里面与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。笔者这里直接将这两句话屏蔽掉了。
步骤2 :配置完毕后,输入“make install”进行安装(需要root权限),这里编译失败出现了一个问题:
error while loading shared libraries:libmosquitto.so.1 : cannot open shared object file: No such file or directory
所以问题很清楚,没有找到这个动态链接库。遇到这种问题就有两种情况:
1).确实没有这个库或者库的版本不对 。 2)这个库所在的路径不在系统查找范围内。
笔者感觉这个库名字很眼熟,果然在“make install”命令执行的打印信息中发现蛛丝马迹:
“install -s --strip-program=strip libmosquitto.so.1 /usr/local/lib/libmosquitto.so.1”
笔者在这个路径下,找到了该动态库,说明现在的问题应该是属于第二种情况(而且是官方的代码,也不应该会犯第一种问题),于是在网上找到了解决方案。
1) 如果共享库文件安装到了/lib或/usr/lib目录下, 那么需执行一下ldconfig命令
ldconfig命令的用途, 主要是在默认搜寻目录(/lib和/usr/lib)以及动态库配置文件/etc/ld.so.conf内所列的目录下, 搜索出可共享的动态链接库(格式如lib*.so*), 进而创建出动态装入程序(ld.so)所需的连接和缓存文件. 缓存文件默认为/etc/ld.so.cache, 此文件保存已排好序的动态链接库名字列表.
2) 如果共享库文件安装到了/usr/local/lib(很多开源的共享库都会安装到该目录下)或其它"非/lib或/usr/lib"目录下, 那么在执行ldconfig命令前, 还要把新共享库目录加入到共享库配置文件/etc/ld.so.conf中, 如下:(需要root权限执行下面命令)
# cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
# echo "/usr/local/lib" >> /etc/ld.so.conf
# ldconfig
(详情参阅http://blog.iyunv.com/uid-26212859-id-3256667.html)
这里笔者就是使用第二种情况的办法,成功完成编译。
完成后会在系统命令行里发现mosquitto、mosquitto_sub、mosquitto_pub三个工具(网上说有四个,还有一个mosquitto_passwd,用于管理密码,应该是关闭SSL的原因),分别用于启动代理、订阅消息和发布消息。
测试
步骤1:开启一个终端:输入“mosquitto”命令,结果如下图,服务启动,因为一直监听,所以不会看到命令行。
正常情况
输入“mosquitto”如果如下图报错,发现报错是地址已被使用,可以使用 "ps -e "查看进程和“netstat -apn | grep :1883”来查看谁占用端口,可使用“kill -s 9 pid号”杀死该进程,然后重新输入“mosquitto”命令即可得到上图正确结果。
报错
步骤2:开启第二个终端,输入“mosquitto_sub -t 主题名 -i 用户名”, (后面的“-i 用户名可省略”)
例如:mosquitto_sub -t mqtt 结果如图,由于一直监听,所以也不会看到命令行。
发布消息前第二个终端截图
步骤3:开启第三个终端,输入“mosquitto_pub -t 主题名 -i 用户名 -m 要发送的消息”
(如果要发送的消息中有空格,需用引号括起来)
例如:mosquitto_pub -h localhost -t mqtt -m "hello world"
则第二个终端可以收到这条信息。笔者看到其命令行有文件传输,又尝试传一个文件(内容只有一句话),第二个终端会直接显示文件的内容(截图中“hello World”下面的那句话就是)。尝试一个大文件的传输,将一个7M的书传过去,首先是可以传,但是第二个窗口显示的全是乱码,传输的速度也是一个问题。
发布消息后的第二个终端截图
这里之所以会想到传文件是因为看到mosquitto_pub的命令参数中有关于把文件当做message传输的记录,如图:
这里的文件上限默认是256M。逻辑中有对文件大小的判断,超过256M的文件则不传。不知道这里如果吧这个值修改更大,会不会产生影响,笔者没有尝试,因为传7M的文件都感觉很慢。(这个问题在MQTT协议介绍中可以得到答案,MQTT文件长度的表示是用1至4个字节来表示,而其表示长度的方式又有特殊的加密方式,按照这种方式,其最大表示的长度为256M)
测试总结
三个终端,一个用来开启服务,一个执行mosquitto_sub来订阅消息,与服务器保持长连接,随时接收来自服务器推送的消息,最后一个终端则用来发布消息。这个测试的结果现在是正确的,但仍存在局限性,还有以下几个问题需要注意:
1)了解mosquitto_sub和mosquitto_pub命令背后是如何执行的,需要修改,订阅端的处理肯定不能仅仅是显示内容 到标准输出上。
2)了解mosquitto命令的逻辑,这里包含的内容很多,估计也是最难的。
3)这里的实验是在本地传输,需要做一个客户端出来(客户端可能是Android端或者MCU端),看是否可以正常传输,还有就是能传多大的数据,允许同时连入的客户数有多少(据说是20000以上)。
mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
mosquito 是一个MQTT 服务器。目前只在公司移动互联网服务器上环境部署过,用的比较少,show 一下安装过程:
wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-1.0.3.tar.gz
cd mosquitto-1.0.3
make WITH_TLS_PSK=no (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加参数WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/* /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*
strip /homo/mosquitto/lib/*
echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf
修改 /homo/mosquitto/etc/mosquitto.conf 用户:
user nobody
(其它参数目前使用默认值)
启动服务(如有服务相关错误,检查/home/mosquitto/mosquitto.log,端口默认1883)
/home/mosquitto/sbin/mosquitto -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1
终端测试:
客户端
mosquitto_sub -h SERVERIP -t test
服务器端执行后
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客户端会成功收到"123456"
查看Mqtt订阅者运行状况
mosquitto_sub -v -t \$SYS/#
或者细化为其中一个命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
monitor statitics script
100万并发连接之笔记
epoll详解
select,poll和epoll区别
高并发的epoll多线程实例
mosquitto的pthread无法使用
C1000连接
mosquitto源代码分析
Java NIO 单线程服务器
lighthttp vs apache vs ngix
Java heap space error summary
mqtt benchmark can not create native thread
mosquitto 1.2.1代码优化
mosquitto 100k测试场景
Mosquitto pub/sub代码分析
优化:
对于后续的提高优化的地方,简单记录几点:
发送数据用writev
poll -> epoll ,用以支持更高的冰法;
改为单线程版本,降低锁开销,目前锁开销还是非常大的。目测可以改为单进程版本,类似redis,精心维护的话应该能达到不错的效果;
网络数据读写使用一次尽量多读的方式,避免多次进入系统调用;
内存操作优化。不free,留着下次用;
考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能;
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory JVM内存
ReservedOsMemory 保留的操作系统内存
ThreadStackSize 线程栈的大小
在java语言里, 当你创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。
一哥们发送线程数和xss以及max user process 有关
epoll + 多线程实现并发网络连接处理
Mosquitto持久层群推消息实现思路
JAVA使用EPoll来进行NIO处理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (仅限 Linux 系统 ),对并发idle connection会有大幅度的性能提升,这就是很多网络服务器应用程序需要的。
启用的方法如下:
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
单进程FD上限是最大可以打开文件的数目,
这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max
mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务
压力测试: 系统参数修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)
测试最大连接数
#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
(( c++ ))
done
# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
kill -9 `ps -ef|grep mosquitto|awk '{print $2}'`
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9
pkill -9 mosquitto
killall -9 mosquitto
查看cpu 占用高的线程
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu
查看线程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto
[[email protected] fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2 0x000000000040414b in main ()
实际测试
Test case 1
1 10934 max connections with QoS 0
[[email protected] ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12
脚本输出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Test Case 2:
该测试只是验证能打开的最大连接数,意义不太大
max files:
ulimit -n 15000
[[email protected] test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088
cpu 2 core 只使用100%(total 200%),目前没有publish数据,所以memory使用量很少3%,
还可以继续提升
vmstat
[[email protected] test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 24080 1865596 233880 266228 1 3 2 4 2 11 0 0 99 0 0
1 0 24080 1865588 233880 266228 0 0 0 0 1030 30 17 33 50 0 0
[[email protected] test]# top
top - 03:17:46 up 13 days, 10 min, 4 users, load average: 1.09, 0.83, 0.37
Tasks: 164 total, 2 running, 162 sleeping, 0 stopped, 0 zombie
Cpu0 : 31.2%us, 68.8%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 0.0%us, 0.3%sy, 0.0%ni, 99.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 1091948k used, 1865588k free, 233880k buffers
Swap: 3096568k total, 24080k used, 3072488k free, 266228k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6709 root 20 0 150m 113m 784 R 100.0 3.9 15:16.63 mosquitto
发现问题
no route to host;
telnet: Unable to connect to remote host: No route to host
觉得甚是差异,估计是虚拟机装了有问题,就把虚拟机中的防火墙给清了一下,发现可行。
[[email protected] ~]$ sudo iptables -F
mosquitto无法启动
发现netstat -an|grep 1883 没有发现记录,发现mosquitto.db文件很大445M,把该文件删除后,
可以正常启动
2014/05/07 结果
------------------------------------------------------------------------------------------------
【subscriber】
开6000个subscribe active 连接,脚本如下
java -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883
kswapd0 进程跳来跳去
top - 13:16:13 up 31 min, 4 users, load average: 617.83, 187.04, 65.29
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.9%us, 58.7%sy, 0.0%ni, 0.0%id, 34.5%wa, 0.0%hi, 3.9%si, 0.0%st
Mem: 2957536k total, 2899268k used, 58268k free, 984k buffers
Swap: 3096568k total, 805564k used, 2291004k free, 14656k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2538 root 20 0 25.2g 1.7g 4420 S 122.1 60.3 5:27.12 java
[[email protected] ~]# free -m
total used free shared buffers cached
Mem: 2888 2810 77 0 0 15
id=1,rate=996.20,threads=6000
id=1,rate=1007.30,threads=6000
之后司机
publish 只有700M可用内存
[[email protected] Local]# top
top - 17:13:51 up 3:44, 2 users, load average: 0.00, 0.00, 0.05
Tasks: 124 total, 1 running, 123 sleeping, 0 stopped, 0 zombie
Cpu(s): 14.7%us, 15.2%sy, 0.0%ni, 64.6%id, 0.0%wa, 0.0%hi, 5.4%si, 0.0%st
Mem: 2011348k total, 1736976k used, 274372k free, 23976k buffers
Swap: 8388600k total, 0k used, 8388600k free, 803732k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1946 root 20 0 3362m 591m 7292 S 66.7 30.1 3:22.69 java
最多2500个publish active 连接, throughput 3242 msgs/second,
id=5,rate=3084.80,threads=2000
id=5,rate=2324.60,threads=2000
id=5,rate=3176.30,threads=2000
id=5,rate=3091.40,threads=2000
[mosquitto]
connection 后的top
[[email protected] mosquitto]# top -b
top - 05:50:55 up 2:00, 4 users, load average: 1.00, 0.81, 0.74
Tasks: 161 total, 2 running, 159 sleeping, 0 stopped, 0 zombie
Cpu(s): 31.9%us, 7.4%sy, 0.0%ni, 54.8%id, 3.2%wa, 0.0%hi, 2.6%si, 0.0%st
Mem: 2957536k total, 2518020k used, 439516k free, 31976k buffers
Swap: 3096568k total, 0k used, 3096568k free, 209064k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 R 77.9 42.5 93:35.86 mosquitto
[[email protected] ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l
8000
遇到问题
[[email protected] ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
FIN_WAIT1 298
ESTABLISHED 2372
LISTEN 12
tcp 0 21848 9.119.154.107:1883 9.119.154.160:55851(sub) FIN_WAIT1
tcp 0 37393 9.119.154.107:1883 9.119.154.160:57275 FIN_WAIT1
tcp 0 0 9.119.154.107:1883 9.119.154.160:56913 FIN_WAIT2
tcp 0 40545 9.119.154.107:1883 9.119.154.160:55864 FIN_WAIT1
netstat显示的连接状态有几种WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他们的含义要从TCP的连接中断过程说起
Server Client
-------- FIN -------->
<------- ACK ---------
<------- FIN ---------
-------- ACK -------->
假设服务器主动关闭连接(Active Close)
服务器首先向客户机发送FIN包,然后服务器进入FIN_WAIT_1状态。
客户机向服务器确认FIN包收到,向服务器发送FIN/ACK,客户机进入CLOSE_WAIT状态。
服务器收到来自客户机的FIN/ACK后,进入FIN_WAIT_2状态
现在客户机进入被动关闭(“passive close”)状态,客户机操作系统等待他上面的应用程序关闭连接。一旦连接被关闭,客户端会发送FIN包到服务器
当服务器收到FIN包后,服务器会向客户机发送FIN/ACK确认,然后进入著名的TIME_WAIT状态
由于在连接关闭后,还不能确定所有连接关闭前的包都被服务器接受到了(包的接受是没有先后顺序的),因此有了TIME_WAIT状态。在这个状态中,服务器仍然在等待客户机发送的但是还未到达服务器的包。这个状态将保持2*MSL的时间,这里的MSL指的是一个TCP包在网络中存在的最长时间。一般情况下2*MSL=240秒。
-------------------------------------------------------------------------------------
10000 sub connections
mosquitto 2G 内存 32% CPU
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu0 : 3.7%us, 11.6%sy, 0.0%ni, 84.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 4.3%us, 13.6%sy, 0.0%ni, 82.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 2202836k used, 754700k free, 34916k buffers
Swap: 3096568k total, 0k used, 3096568k free, 48164k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 S 32.6 42.5 148:35.98 mosquitto
-----------------boss worker multithread model---------------------------
int queue[QUEUE_SIZE];
This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,
dequeue, empty, etc. When the server accepts a connection,
it enqueues the socket that the incoming connection is on.
The worker threads which were dispatched at the beginning are constantly checking this queue to
see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port,
and read/parse/write the incoming http request.
int main(int argc, char* argv[])
{
int hSocket, hServerSocket;
struct hostent* pHostInfo;
struct sockaddr_in Address;
int nAddressSize = sizeof(struct sockaddr_in);
int nHostPort;
int numThreads;
int i;
init(&head,&tail);
/
hServerSocket=socket(AF_INET,SOCK_STREAM,0);
if(hServerSocket == SOCKET_ERROR)
{
printf("\nCould not make a socket\n");
return 0;
}
Address.sin_addr.s_addr = INADDR_ANY;
Address.sin_port = htons(nHostPort);
Address.sin_family = AF_INET;
printf("\nBinding to port %d\n",nHostPort);
if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {
printf("\nCould not connect to host\n");
return 0;
}
getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);
printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));
printf("Server\n\
sin_family = %d\n\
sin_addr.s_addr = %d\n\
sin_port = %d\n"
, Address.sin_family
, Address.sin_addr.s_addr
, ntohs(Address.sin_port)
);
//Up to this point is boring server set up stuff. I need help below this.
/
if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {
printf("\nCould not listen\n");
return 0;
}
while(1) {
pthread_mutex_lock(&mtx);
printf("\nWaiting for a connection");
while(!empty(head,tail)) {
pthread_cond_wait (&cond2, &mtx);
}
hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);
printf("\nGot a connection");
enqueue(queue,&tail,hSocket);
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond); // wake worker thread
}
}
void *worker(void *threadarg) {
while(true)
{
pthread_mutex_lock(&mtx);
while(empty(head,tail)) {
pthread_cond_wait(&cond, &mtx);
}
int hSocket = dequeue(queue,&head);
unsigned nSendAmount, nRecvAmount;
char line[BUFFER_SIZE];
nRecvAmount = read(hSocket,line,sizeof line);
printf("\nReceived %s from client\n",line);
/
if(close(hSocket) == SOCKET_ERROR) {
printf("\nCould not close socket\n");
return 0;
}
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond);
}
epoll与select、poll区别
1、相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。内核中的select与poll的实现是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
2、epoll的实现是基于回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。
3、内核 / 用户空间 内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。
4、epoll不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。
epoll 的EPOLLLT (水平触发,默认)和 EPOLLET(边沿触发)模式的区别
1、EPOLLLT:完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。
2、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息,(从epoll队列移除)直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高,因为有可能会出现数据读取不完整的问题,举例如下:
假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发,那么这个fd变成就绪状态就会从epoll 队列移除,很可能epoll_wait 会一直阻塞,忽略尚未读取的1k数据,与此同时对方还在等待着我们发送一个回复ack,表示已经接收到数据;如果是电平触发,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
man epoll example
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd = epoll_create(10);
if (epollfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
Mosquitto简要教程(安装/使用/测试)
上篇文章《Android主流推送方案分析(MQTT/XMPP/GCM)》中,我们给大家介绍了,如何在移动领域使用灵巧的消息传输协议MQTT来完成消息推送,最后也提到了开源项目Mosquitto。实际上,Mosquitto是一个实现了MQTT3.1协议的代理服务器,由MQTT协议创始人之一的Andy Stanford-Clark开发,它为我们提供了非常棒的轻量级数据交换的解决方案。本文的主旨在于记录Mosquitto服务的安装和使用,以备日后查阅。
1、获取&安装
Mosquitto提供了Windows、Linux以及qnx系统的版本,安装文件可从http://mosquitto.org/files/binary/地址中获取(建议使用最新的1.1.x版本)。Windows系统下的安装过程非常简单,我们甚至可以把Mosquitto直接安装成为系统服务;但是,在实际应用中,我们更倾向于使用Linux系统的服务器,接下来我们就将重点介绍Linux版Mosquitto的安装方法。
在Linux系统上安装Mosquitto,本人建议大家使用源码安装模式,最新的源码可从http://mosquitto.org/files/source/地址中获取。解压之后,我们可以在源码目录里面找到主要的配置文件config.mk,其中包含了所有Mosquitto的安装选项,详细的参数说明如下:
[html] view plaincopy
# 是否支持tcpd/libwrap功能.
#WITH_WRAP:=yes
# 是否开启SSL/TLS支持
#WITH_TLS:=yes
# 是否开启TLS/PSK支持
#WITH_TLS_PSK:=yes
# Comment out to disable client client threading support.
#WITH_THREADING:=yes
# 是否使用严格的协议版本(老版本兼容会有点问题)
#WITH_STRICT_PROTOCOL:=yes
# 是否开启桥接模式
#WITH_BRIDGE:=yes
# 是否开启持久化功能
#WITH_PERSISTENCE:=yes
# 是否监控运行状态
#WITH_MEMORY_TRACKING:=yes
这里需要注意的是,默认情况下Mosquitto的安装需要OpenSSL的支持;如果不需要SSL,则需要关闭config.mk里面的某些与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。接着,就是运行make install进行安装,完成之后会在系统命令行里发现mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四个工具(截图如下),分别用于启动代理、管理密码、发布消息和订阅消息。
2、配置&运行
安装完成之后,所有配置文件会被放置于/etc/mosquitto/目录下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,以下是详细的配置参数说明。
[html] view plaincopy
# =================================================================
# General configuration
# =================================================================
# 客户端心跳的间隔时间
#retry_interval 20
# 系统状态的刷新时间
#sys_interval 10
# 系统资源的回收时间,0表示尽快处理
#store_clean_interval 10
# 服务进程的PID
#pid_file /var/run/mosquitto.pid
# 服务进程的系统用户
#user mosquitto
# 客户端心跳消息的最大并发数
#max_inflight_messages 10
# 客户端心跳消息缓存队列
#max_queued_messages 100
# 用于设置客户端长连接的过期时间,默认永不过期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服务绑定的IP地址
#bind_address
# 服务绑定的端口号
#port 1883
# 允许的最大连接数,-1表示没有限制
#max_connections -1
# cafile:CA证书文件
# capath:CA证书目录
# certfile:PEM证书文件
# keyfile:PEM**文件
#cafile
#capath
#certfile
#keyfile
# 必须提供证书以保证数据安全性
#require_certificate false
# 若require_certificate值为true,use_identity_as_username也必须为true
#use_identity_as_username false
# 启用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 消息自动保存的间隔时间
#autosave_interval 1800
# 消息自动保存功能的开关
#autosave_on_changes false
# 持久化功能的开关
persistence true
# 持久化DB文件
#persistence_file mosquitto.db
# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能
log_dest none
# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否记录客户端连接信息
#connection_messages true
# 是否记录日志时间
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客户端ID的前缀限制,可用于保证安全性
#clientid_prefixes
# 允许匿名用户
#allow_anonymous true
# 用户/密码文件,默认格式:username:password
#password_file
# PSK格式密码文件,默认格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL权限配置,常用语法如下:
# 用户限制:user <username>
# 话题限制:topic [read|write] <topic>
# 正则限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 设置桥接的客户端ID
#clientid
# 桥接断开时,是否清除远程服务器中的消息
#cleansession false
# 是否发布桥接的状态信息
#notifications true
# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 设置桥接的keepalive数值
#keepalive_interval 60
# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic
# 桥接模式automatic的超时时间
#restart_timeout 30
# 桥接模式lazy的超时时间
#idle_timeout 60
# 桥接客户端的用户名
#username
# 桥接客户端的密码
#password
# bridge_cafile:桥接客户端的CA证书文件
# bridge_capath:桥接客户端的CA证书目录
# bridge_certfile:桥接客户端的PEM证书文件
# bridge_keyfile:桥接客户端的PEM**文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
# 自己的配置可以放到以下目录中
include_dir /etc/mosquitto/conf.d
最后,启动Mosquitto服务很简单,直接运行命令行“mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务。接下来,就让我们尽情体验Mosquitto的强大功能吧!当然,有了Mosquitto,我们就可以安心地抛弃“简陋”的rsmb了,有兴趣的话,大家还可以尝试把Mosquitto服务运用到上一篇的Android推送服务中。
另外,Mosquitto是个异步IO框架,经测试可以轻松处理20000个以上的客户端连接。当然,实际的最大承载量还和业务的复杂度还有比较大的关系。下图是本人在一台普通Linux机器上进行的压力测试结果,大家可以参考。
友情提醒:测试的时候不要忘记调整系统的最大连接数和栈大小,比如:Linux上可用ulimit -n20000 -s512命令设置你需要的系统参数。
http://blog.iyunv.com/shagoo/article/details/7910598
https://github.com/mqtt/mqtt.github.io
http://mosquitto.org/download
使用传统源码安装步骤:
步骤1:http://mosquitto.org/files/source/官网下载源码,放到Linux环境中。解压后,找到主要配置文件config.mk,其中包含mosquitto的安装选项,需要注意的是,默认情况下mosquitto的安装需要OpenSSL(一个强大的安全套接字层密码库)的支持,若不需要SSL,则需要关闭config.mk里面与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。笔者这里直接将这两句话屏蔽掉了。
步骤2 :配置完毕后,输入“make install”进行安装(需要root权限),这里编译失败出现了一个问题:
error while loading shared libraries:libmosquitto.so.1 : cannot open shared object file: No such file or directory
所以问题很清楚,没有找到这个动态链接库。遇到这种问题就有两种情况:
1).确实没有这个库或者库的版本不对 。 2)这个库所在的路径不在系统查找范围内。
笔者感觉这个库名字很眼熟,果然在“make install”命令执行的打印信息中发现蛛丝马迹:
“install -s --strip-program=strip libmosquitto.so.1 /usr/local/lib/libmosquitto.so.1”
笔者在这个路径下,找到了该动态库,说明现在的问题应该是属于第二种情况(而且是官方的代码,也不应该会犯第一种问题),于是在网上找到了解决方案。
1) 如果共享库文件安装到了/lib或/usr/lib目录下, 那么需执行一下ldconfig命令
ldconfig命令的用途, 主要是在默认搜寻目录(/lib和/usr/lib)以及动态库配置文件/etc/ld.so.conf内所列的目录下, 搜索出可共享的动态链接库(格式如lib*.so*), 进而创建出动态装入程序(ld.so)所需的连接和缓存文件. 缓存文件默认为/etc/ld.so.cache, 此文件保存已排好序的动态链接库名字列表.
2) 如果共享库文件安装到了/usr/local/lib(很多开源的共享库都会安装到该目录下)或其它"非/lib或/usr/lib"目录下, 那么在执行ldconfig命令前, 还要把新共享库目录加入到共享库配置文件/etc/ld.so.conf中, 如下:(需要root权限执行下面命令)
# cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
# echo "/usr/local/lib" >> /etc/ld.so.conf
# ldconfig
(详情参阅http://blog.iyunv.com/uid-26212859-id-3256667.html)
这里笔者就是使用第二种情况的办法,成功完成编译。
完成后会在系统命令行里发现mosquitto、mosquitto_sub、mosquitto_pub三个工具(网上说有四个,还有一个mosquitto_passwd,用于管理密码,应该是关闭SSL的原因),分别用于启动代理、订阅消息和发布消息。
测试
步骤1:开启一个终端:输入“mosquitto”命令,结果如下图,服务启动,因为一直监听,所以不会看到命令行。
正常情况
输入“mosquitto”如果如下图报错,发现报错是地址已被使用,可以使用 "ps -e "查看进程和“netstat -apn | grep :1883”来查看谁占用端口,可使用“kill -s 9 pid号”杀死该进程,然后重新输入“mosquitto”命令即可得到上图正确结果。
报错
步骤2:开启第二个终端,输入“mosquitto_sub -t 主题名 -i 用户名”, (后面的“-i 用户名可省略”)
例如:mosquitto_sub -t mqtt 结果如图,由于一直监听,所以也不会看到命令行。
发布消息前第二个终端截图
步骤3:开启第三个终端,输入“mosquitto_pub -t 主题名 -i 用户名 -m 要发送的消息”
(如果要发送的消息中有空格,需用引号括起来)
例如:mosquitto_pub -h localhost -t mqtt -m "hello world"
则第二个终端可以收到这条信息。笔者看到其命令行有文件传输,又尝试传一个文件(内容只有一句话),第二个终端会直接显示文件的内容(截图中“hello World”下面的那句话就是)。尝试一个大文件的传输,将一个7M的书传过去,首先是可以传,但是第二个窗口显示的全是乱码,传输的速度也是一个问题。
发布消息后的第二个终端截图
这里之所以会想到传文件是因为看到mosquitto_pub的命令参数中有关于把文件当做message传输的记录,如图:
这里的文件上限默认是256M。逻辑中有对文件大小的判断,超过256M的文件则不传。不知道这里如果吧这个值修改更大,会不会产生影响,笔者没有尝试,因为传7M的文件都感觉很慢。(这个问题在MQTT协议介绍中可以得到答案,MQTT文件长度的表示是用1至4个字节来表示,而其表示长度的方式又有特殊的加密方式,按照这种方式,其最大表示的长度为256M)
测试总结
三个终端,一个用来开启服务,一个执行mosquitto_sub来订阅消息,与服务器保持长连接,随时接收来自服务器推送的消息,最后一个终端则用来发布消息。这个测试的结果现在是正确的,但仍存在局限性,还有以下几个问题需要注意:
1)了解mosquitto_sub和mosquitto_pub命令背后是如何执行的,需要修改,订阅端的处理肯定不能仅仅是显示内容 到标准输出上。
2)了解mosquitto命令的逻辑,这里包含的内容很多,估计也是最难的。
3)这里的实验是在本地传输,需要做一个客户端出来(客户端可能是Android端或者MCU端),看是否可以正常传输,还有就是能传多大的数据,允许同时连入的客户数有多少(据说是20000以上)。
mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
mosquito 是一个MQTT 服务器。目前只在公司移动互联网服务器上环境部署过,用的比较少,show 一下安装过程:
wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-1.0.3.tar.gz
cd mosquitto-1.0.3
make WITH_TLS_PSK=no (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加参数WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/* /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*
strip /homo/mosquitto/lib/*
echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf
修改 /homo/mosquitto/etc/mosquitto.conf 用户:
user nobody
(其它参数目前使用默认值)
启动服务(如有服务相关错误,检查/home/mosquitto/mosquitto.log,端口默认1883)
/home/mosquitto/sbin/mosquitto -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1
终端测试:
客户端
mosquitto_sub -h SERVERIP -t test
服务器端执行后
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客户端会成功收到"123456"
查看Mqtt订阅者运行状况
mosquitto_sub -v -t \$SYS/#
或者细化为其中一个命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
monitor statitics script
100万并发连接之笔记
epoll详解
select,poll和epoll区别
高并发的epoll多线程实例
mosquitto的pthread无法使用
C1000连接
mosquitto源代码分析
Java NIO 单线程服务器
lighthttp vs apache vs ngix
Java heap space error summary
mqtt benchmark can not create native thread
mosquitto 1.2.1代码优化
mosquitto 100k测试场景
Mosquitto pub/sub代码分析
优化:
对于后续的提高优化的地方,简单记录几点:
发送数据用writev
poll -> epoll ,用以支持更高的冰法;
改为单线程版本,降低锁开销,目前锁开销还是非常大的。目测可以改为单进程版本,类似redis,精心维护的话应该能达到不错的效果;
网络数据读写使用一次尽量多读的方式,避免多次进入系统调用;
内存操作优化。不free,留着下次用;
考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能;
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory JVM内存
ReservedOsMemory 保留的操作系统内存
ThreadStackSize 线程栈的大小
在java语言里, 当你创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。
一哥们发送线程数和xss以及max user process 有关
epoll + 多线程实现并发网络连接处理
Mosquitto持久层群推消息实现思路
JAVA使用EPoll来进行NIO处理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (仅限 Linux 系统 ),对并发idle connection会有大幅度的性能提升,这就是很多网络服务器应用程序需要的。
启用的方法如下:
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
单进程FD上限是最大可以打开文件的数目,
这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max
mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务
压力测试: 系统参数修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)
测试最大连接数
#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
(( c++ ))
done
# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
kill -9 `ps -ef|grep mosquitto|awk '{print $2}'`
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9
pkill -9 mosquitto
killall -9 mosquitto
查看cpu 占用高的线程
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu
查看线程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto
[[email protected] fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2 0x000000000040414b in main ()
实际测试
Test case 1
1 10934 max connections with QoS 0
[[email protected] ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12
脚本输出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Test Case 2:
该测试只是验证能打开的最大连接数,意义不太大
max files:
ulimit -n 15000
[[email protected] test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088
cpu 2 core 只使用100%(total 200%),目前没有publish数据,所以memory使用量很少3%,
还可以继续提升
vmstat
[[email protected] test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 24080 1865596 233880 266228 1 3 2 4 2 11 0 0 99 0 0
1 0 24080 1865588 233880 266228 0 0 0 0 1030 30 17 33 50 0 0
[[email protected] test]# top
top - 03:17:46 up 13 days, 10 min, 4 users, load average: 1.09, 0.83, 0.37
Tasks: 164 total, 2 running, 162 sleeping, 0 stopped, 0 zombie
Cpu0 : 31.2%us, 68.8%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 0.0%us, 0.3%sy, 0.0%ni, 99.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 1091948k used, 1865588k free, 233880k buffers
Swap: 3096568k total, 24080k used, 3072488k free, 266228k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6709 root 20 0 150m 113m 784 R 100.0 3.9 15:16.63 mosquitto
发现问题
no route to host;
telnet: Unable to connect to remote host: No route to host
觉得甚是差异,估计是虚拟机装了有问题,就把虚拟机中的防火墙给清了一下,发现可行。
[[email protected] ~]$ sudo iptables -F
mosquitto无法启动
发现netstat -an|grep 1883 没有发现记录,发现mosquitto.db文件很大445M,把该文件删除后,
可以正常启动
2014/05/07 结果
------------------------------------------------------------------------------------------------
【subscriber】
开6000个subscribe active 连接,脚本如下
java -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883
kswapd0 进程跳来跳去
top - 13:16:13 up 31 min, 4 users, load average: 617.83, 187.04, 65.29
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.9%us, 58.7%sy, 0.0%ni, 0.0%id, 34.5%wa, 0.0%hi, 3.9%si, 0.0%st
Mem: 2957536k total, 2899268k used, 58268k free, 984k buffers
Swap: 3096568k total, 805564k used, 2291004k free, 14656k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2538 root 20 0 25.2g 1.7g 4420 S 122.1 60.3 5:27.12 java
[[email protected] ~]# free -m
total used free shared buffers cached
Mem: 2888 2810 77 0 0 15
id=1,rate=996.20,threads=6000
id=1,rate=1007.30,threads=6000
之后司机
publish 只有700M可用内存
[[email protected] Local]# top
top - 17:13:51 up 3:44, 2 users, load average: 0.00, 0.00, 0.05
Tasks: 124 total, 1 running, 123 sleeping, 0 stopped, 0 zombie
Cpu(s): 14.7%us, 15.2%sy, 0.0%ni, 64.6%id, 0.0%wa, 0.0%hi, 5.4%si, 0.0%st
Mem: 2011348k total, 1736976k used, 274372k free, 23976k buffers
Swap: 8388600k total, 0k used, 8388600k free, 803732k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1946 root 20 0 3362m 591m 7292 S 66.7 30.1 3:22.69 java
最多2500个publish active 连接, throughput 3242 msgs/second,
id=5,rate=3084.80,threads=2000
id=5,rate=2324.60,threads=2000
id=5,rate=3176.30,threads=2000
id=5,rate=3091.40,threads=2000
[mosquitto]
connection 后的top
[[email protected] mosquitto]# top -b
top - 05:50:55 up 2:00, 4 users, load average: 1.00, 0.81, 0.74
Tasks: 161 total, 2 running, 159 sleeping, 0 stopped, 0 zombie
Cpu(s): 31.9%us, 7.4%sy, 0.0%ni, 54.8%id, 3.2%wa, 0.0%hi, 2.6%si, 0.0%st
Mem: 2957536k total, 2518020k used, 439516k free, 31976k buffers
Swap: 3096568k total, 0k used, 3096568k free, 209064k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 R 77.9 42.5 93:35.86 mosquitto
[[email protected] ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l
8000
遇到问题
[[email protected] ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
FIN_WAIT1 298
ESTABLISHED 2372
LISTEN 12
tcp 0 21848 9.119.154.107:1883 9.119.154.160:55851(sub) FIN_WAIT1
tcp 0 37393 9.119.154.107:1883 9.119.154.160:57275 FIN_WAIT1
tcp 0 0 9.119.154.107:1883 9.119.154.160:56913 FIN_WAIT2
tcp 0 40545 9.119.154.107:1883 9.119.154.160:55864 FIN_WAIT1
netstat显示的连接状态有几种WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他们的含义要从TCP的连接中断过程说起
Server Client
-------- FIN -------->
<------- ACK ---------
<------- FIN ---------
-------- ACK -------->
假设服务器主动关闭连接(Active Close)
服务器首先向客户机发送FIN包,然后服务器进入FIN_WAIT_1状态。
客户机向服务器确认FIN包收到,向服务器发送FIN/ACK,客户机进入CLOSE_WAIT状态。
服务器收到来自客户机的FIN/ACK后,进入FIN_WAIT_2状态
现在客户机进入被动关闭(“passive close”)状态,客户机操作系统等待他上面的应用程序关闭连接。一旦连接被关闭,客户端会发送FIN包到服务器
当服务器收到FIN包后,服务器会向客户机发送FIN/ACK确认,然后进入著名的TIME_WAIT状态
由于在连接关闭后,还不能确定所有连接关闭前的包都被服务器接受到了(包的接受是没有先后顺序的),因此有了TIME_WAIT状态。在这个状态中,服务器仍然在等待客户机发送的但是还未到达服务器的包。这个状态将保持2*MSL的时间,这里的MSL指的是一个TCP包在网络中存在的最长时间。一般情况下2*MSL=240秒。
-------------------------------------------------------------------------------------
10000 sub connections
mosquitto 2G 内存 32% CPU
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu0 : 3.7%us, 11.6%sy, 0.0%ni, 84.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 4.3%us, 13.6%sy, 0.0%ni, 82.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 2202836k used, 754700k free, 34916k buffers
Swap: 3096568k total, 0k used, 3096568k free, 48164k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 S 32.6 42.5 148:35.98 mosquitto
-----------------boss worker multithread model---------------------------
int queue[QUEUE_SIZE];
This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,
dequeue, empty, etc. When the server accepts a connection,
it enqueues the socket that the incoming connection is on.
The worker threads which were dispatched at the beginning are constantly checking this queue to
see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port,
and read/parse/write the incoming http request.
int main(int argc, char* argv[])
{
int hSocket, hServerSocket;
struct hostent* pHostInfo;
struct sockaddr_in Address;
int nAddressSize = sizeof(struct sockaddr_in);
int nHostPort;
int numThreads;
int i;
init(&head,&tail);
/
hServerSocket=socket(AF_INET,SOCK_STREAM,0);
if(hServerSocket == SOCKET_ERROR)
{
printf("\nCould not make a socket\n");
return 0;
}
Address.sin_addr.s_addr = INADDR_ANY;
Address.sin_port = htons(nHostPort);
Address.sin_family = AF_INET;
printf("\nBinding to port %d\n",nHostPort);
if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {
printf("\nCould not connect to host\n");
return 0;
}
getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);
printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));
printf("Server\n\
sin_family = %d\n\
sin_addr.s_addr = %d\n\
sin_port = %d\n"
, Address.sin_family
, Address.sin_addr.s_addr
, ntohs(Address.sin_port)
);
//Up to this point is boring server set up stuff. I need help below this.
/
if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {
printf("\nCould not listen\n");
return 0;
}
while(1) {
pthread_mutex_lock(&mtx);
printf("\nWaiting for a connection");
while(!empty(head,tail)) {
pthread_cond_wait (&cond2, &mtx);
}
hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);
printf("\nGot a connection");
enqueue(queue,&tail,hSocket);
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond); // wake worker thread
}
}
void *worker(void *threadarg) {
while(true)
{
pthread_mutex_lock(&mtx);
while(empty(head,tail)) {
pthread_cond_wait(&cond, &mtx);
}
int hSocket = dequeue(queue,&head);
unsigned nSendAmount, nRecvAmount;
char line[BUFFER_SIZE];
nRecvAmount = read(hSocket,line,sizeof line);
printf("\nReceived %s from client\n",line);
/
if(close(hSocket) == SOCKET_ERROR) {
printf("\nCould not close socket\n");
return 0;
}
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond);
}
epoll与select、poll区别
1、相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。内核中的select与poll的实现是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
2、epoll的实现是基于回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。
3、内核 / 用户空间 内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。
4、epoll不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。
epoll 的EPOLLLT (水平触发,默认)和 EPOLLET(边沿触发)模式的区别
1、EPOLLLT:完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。
2、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息,(从epoll队列移除)直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高,因为有可能会出现数据读取不完整的问题,举例如下:
假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发,那么这个fd变成就绪状态就会从epoll 队列移除,很可能epoll_wait 会一直阻塞,忽略尚未读取的1k数据,与此同时对方还在等待着我们发送一个回复ack,表示已经接收到数据;如果是电平触发,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
man epoll example
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd = epoll_create(10);
if (epollfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
Mosquitto简要教程(安装/使用/测试)
上篇文章《Android主流推送方案分析(MQTT/XMPP/GCM)》中,我们给大家介绍了,如何在移动领域使用灵巧的消息传输协议MQTT来完成消息推送,最后也提到了开源项目Mosquitto。实际上,Mosquitto是一个实现了MQTT3.1协议的代理服务器,由MQTT协议创始人之一的Andy Stanford-Clark开发,它为我们提供了非常棒的轻量级数据交换的解决方案。本文的主旨在于记录Mosquitto服务的安装和使用,以备日后查阅。
1、获取&安装
Mosquitto提供了Windows、Linux以及qnx系统的版本,安装文件可从http://mosquitto.org/files/binary/地址中获取(建议使用最新的1.1.x版本)。Windows系统下的安装过程非常简单,我们甚至可以把Mosquitto直接安装成为系统服务;但是,在实际应用中,我们更倾向于使用Linux系统的服务器,接下来我们就将重点介绍Linux版Mosquitto的安装方法。
在Linux系统上安装Mosquitto,本人建议大家使用源码安装模式,最新的源码可从http://mosquitto.org/files/source/地址中获取。解压之后,我们可以在源码目录里面找到主要的配置文件config.mk,其中包含了所有Mosquitto的安装选项,详细的参数说明如下:
[html] view plaincopy
# 是否支持tcpd/libwrap功能.
#WITH_WRAP:=yes
# 是否开启SSL/TLS支持
#WITH_TLS:=yes
# 是否开启TLS/PSK支持
#WITH_TLS_PSK:=yes
# Comment out to disable client client threading support.
#WITH_THREADING:=yes
# 是否使用严格的协议版本(老版本兼容会有点问题)
#WITH_STRICT_PROTOCOL:=yes
# 是否开启桥接模式
#WITH_BRIDGE:=yes
# 是否开启持久化功能
#WITH_PERSISTENCE:=yes
# 是否监控运行状态
#WITH_MEMORY_TRACKING:=yes
这里需要注意的是,默认情况下Mosquitto的安装需要OpenSSL的支持;如果不需要SSL,则需要关闭config.mk里面的某些与SSL功能有关的选项(WITH_TLS、WITH_TLS_PSK)。接着,就是运行make install进行安装,完成之后会在系统命令行里发现mosquitto、mosquitto_passwd、mosquitto_pub和mosquitto_sub四个工具(截图如下),分别用于启动代理、管理密码、发布消息和订阅消息。
2、配置&运行
安装完成之后,所有配置文件会被放置于/etc/mosquitto/目录下,其中最重要的就是Mosquitto的配置文件,即mosquitto.conf,以下是详细的配置参数说明。
[html] view plaincopy
# =================================================================
# General configuration
# =================================================================
# 客户端心跳的间隔时间
#retry_interval 20
# 系统状态的刷新时间
#sys_interval 10
# 系统资源的回收时间,0表示尽快处理
#store_clean_interval 10
# 服务进程的PID
#pid_file /var/run/mosquitto.pid
# 服务进程的系统用户
#user mosquitto
# 客户端心跳消息的最大并发数
#max_inflight_messages 10
# 客户端心跳消息缓存队列
#max_queued_messages 100
# 用于设置客户端长连接的过期时间,默认永不过期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服务绑定的IP地址
#bind_address
# 服务绑定的端口号
#port 1883
# 允许的最大连接数,-1表示没有限制
#max_connections -1
# cafile:CA证书文件
# capath:CA证书目录
# certfile:PEM证书文件
# keyfile:PEM**文件
#cafile
#capath
#certfile
#keyfile
# 必须提供证书以保证数据安全性
#require_certificate false
# 若require_certificate值为true,use_identity_as_username也必须为true
#use_identity_as_username false
# 启用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令获取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 消息自动保存的间隔时间
#autosave_interval 1800
# 消息自动保存功能的开关
#autosave_on_changes false
# 持久化功能的开关
persistence true
# 持久化DB文件
#persistence_file mosquitto.db
# 持久化DB文件目录
#persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4种日志模式:stdout、stderr、syslog、topic
# none 则表示不记日志,此配置可以提升些许性能
log_dest none
# 选择日志的级别(可设置多项)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否记录客户端连接信息
#connection_messages true
# 是否记录日志时间
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客户端ID的前缀限制,可用于保证安全性
#clientid_prefixes
# 允许匿名用户
#allow_anonymous true
# 用户/密码文件,默认格式:username:password
#password_file
# PSK格式密码文件,默认格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL权限配置,常用语法如下:
# 用户限制:user <username>
# 话题限制:topic [read|write] <topic>
# 正则限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允许服务之间使用“桥接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 设置桥接的客户端ID
#clientid
# 桥接断开时,是否清除远程服务器中的消息
#cleansession false
# 是否发布桥接的状态信息
#notifications true
# 设置桥接模式下,消息将会发布到的话题地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 设置桥接的keepalive数值
#keepalive_interval 60
# 桥接模式,目前有三种:automatic、lazy、once
#start_type automatic
# 桥接模式automatic的超时时间
#restart_timeout 30
# 桥接模式lazy的超时时间
#idle_timeout 60
# 桥接客户端的用户名
#username
# 桥接客户端的密码
#password
# bridge_cafile:桥接客户端的CA证书文件
# bridge_capath:桥接客户端的CA证书目录
# bridge_certfile:桥接客户端的PEM证书文件
# bridge_keyfile:桥接客户端的PEM**文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
# 自己的配置可以放到以下目录中
include_dir /etc/mosquitto/conf.d
最后,启动Mosquitto服务很简单,直接运行命令行“mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务。接下来,就让我们尽情体验Mosquitto的强大功能吧!当然,有了Mosquitto,我们就可以安心地抛弃“简陋”的rsmb了,有兴趣的话,大家还可以尝试把Mosquitto服务运用到上一篇的Android推送服务中。
另外,Mosquitto是个异步IO框架,经测试可以轻松处理20000个以上的客户端连接。当然,实际的最大承载量还和业务的复杂度还有比较大的关系。下图是本人在一台普通Linux机器上进行的压力测试结果,大家可以参考。
友情提醒:测试的时候不要忘记调整系统的最大连接数和栈大小,比如:Linux上可用ulimit -n20000 -s512命令设置你需要的系统参数。
http://blog.iyunv.com/shagoo/article/details/7910598
https://github.com/mqtt/mqtt.github.io
http://mosquitto.org/download