mosquito monitor idea
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
mosquito 是一个MQTT 服务器。目前只在公司移动互联网服务器上环境部署过,用的比较少,show 一下安装过程:
wget http://mosquitto.org/files/source/mosquitto-1.0.3.tar.gz
tar -zxvf mosquitto-.tar.gz
cd mosquitto-
make WITH_TLS_PSK=no (遇到undefined reference to `SSL_CTX_set_psk_server_callback' ,添加参数WITH_TLS_PSK=no 禁用SSL PSK 支持)
make install prefix=/home/mosquitto
mkdir /home/mosquitto/etc
mv /etc/mosquitto/* /home/mosquitto/etc/
strip /home/mosquitto/bin/*
strip /homo/mosquitto/sbin/*
strip /homo/mosquitto/lib/*
echo "/home/mosquitto/lib/" >> /etc/ld.so.conf
ldconfig -f /etc/ld.so.conf
修改 /homo/mosquitto/etc/mosquitto.conf 用户:
user nobody
(其它参数目前使用默认值)
启动服务(如有服务相关错误,检查/home/mosquitto/mosquitto.log,端口默认1883)
/home/mosquitto/sbin/mosquitto -d -c /home/mosquitto/etc/mosquitto.conf > /home/mosquitto/mosquitto.log 2>&1
终端测试:
客户端
mosquitto_sub -h SERVERIP -t test
服务器端执行后
/home/mosquitto/bin/mosquitto_pub -t test -m "123456"
客户端会成功收到"123456"
查看Mqtt订阅者运行状况
mosquitto_sub -v -t \$SYS/#
或者细化为其中一个命令
mosquitto_sub -v -t ‘$SYS/broker/clients/active’
monitor statitics script
100万并发连接之笔记
epoll详解
select,poll和epoll区别
高并发的epoll多线程实例
mosquitto的pthread无法使用
C1000连接
mosquitto源代码分析
Java NIO 单线程服务器
lighthttp vs apache vs ngix
Java heap space error summary
mqtt benchmark can not create native thread
mosquitto 1.2.1代码优化
mosquitto 100k测试场景
Mosquitto pub/sub代码分析
优化:
对于后续的提高优化的地方,简单记录几点:
发送数据用writev
poll -> epoll ,用以支持更高的冰法;
改为单线程版本,降低锁开销,目前锁开销还是非常大的。目测可以改为单进程版本,类似redis,精心维护的话应该能达到不错的效果;
网络数据读写使用一次尽量多读的方式,避免多次进入系统调用;
内存操作优化。不free,留着下次用;
考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能;
(MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize) = Number of threads
MaxProcessMemory 指的是一个进程的最大内存
JVMMemory JVM内存
ReservedOsMemory 保留的操作系统内存
ThreadStackSize 线程栈的大小
在java语言里, 当你创建一个线程的时候,虚拟机会在JVM内存创建一个Thread对象同时创建一个操作系统线程,而这个系统线程的内存用的不是JVMMemory,而是系统中剩下的内存(MaxProcessMemory - JVMMemory - ReservedOsMemory)。
一哥们发送线程数和xss以及max user process 有关
epoll + 多线程实现并发网络连接处理
Mosquitto持久层群推消息实现思路
JAVA使用EPoll来进行NIO处理的方法
JDK 6.0 以及JDK 5.0 update 9 的 nio支持epoll (仅限 Linux 系统 ),对并发idle connection会有大幅度的性能提升,这就是很多网络服务器应用程序需要的。
启用的方法如下:
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
单进程FD上限是最大可以打开文件的数目,
这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max
mosquitto -c /etc/mosquitto/mosquitto.conf -d”即可开启服务
压力测试: 系统参数修改
ulimit -u 12000 (max processes)
ulimit -n 12000 (max files)
测试最大连接数
#!/bin/bash
c=1
while [ $c -le 18000 ]
do
mosquitto_sub -d -t hello/world -k 900 &
(( c++ ))
done
# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
kill -9 `ps -ef|grep mosquitto|awk '{print $2}'`
ps -ef|grep report |grep -v grep|awk '{print $2}' |xargs kill -9
pkill -9 mosquitto
killall -9 mosquitto
查看cpu 占用高的线程
ps H -eo user,pid,ppid,tid,time,%cpu,cmd --sort=%cpu
查看线程的I/O信息
/proc/15938/task/15942/fd
lsof -c mosquitto
[root@wwwu fd]# pstack 15938
Thread 5 (Thread 0x7f280e377700 (LWP 15939)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 4 (Thread 0x7f280d976700 (LWP 15940)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 3 (Thread 0x7f280cf75700 (LWP 15941)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c1d0 in mosquitto_main_loop_4_client ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 2 (Thread 0x7f280c574700 (LWP 15942)):
#0 0x000000000040a464 in mqtt3_db_message_timeout_check_epoll ()
#1 0x000000000040bc3b in mosquitto_main_loop_4_client_all ()
#2 0x0000003b15e079d1 in start_thread () from /lib64/libpthread.so.0
#3 0x0000003b15ae8b6d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f280e59c7c0 (LWP 15938)):
#0 0x0000003b15ae9163 in epoll_wait () from /lib64/libc.so.6
#1 0x000000000040c7c5 in mosquitto_main_loop_4_epoll ()
#2 0x000000000040414b in main ()
实际测试
Test case 1
1 10934 max connections with QoS 0
[root@ ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
ESTABLISHED 10934
LISTEN 12
脚本输出
Client mosqsub/13526-wwwu.mqtt sending CONNECT
Client mosqsub/13526-wwwu.mqtt received CONNACK
Client mosqsub/13526-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13526-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Client mosqsub/13523-wwwu.mqtt sending CONNECT
Client mosqsub/13523-wwwu.mqtt received CONNACK
Client mosqsub/13523-wwwu.mqtt sending SUBSCRIBE (Mid: 1, Topic: hello/world, QoS: 0)
Client mosqsub/13523-wwwu.mqtt received SUBACK
Subscribed (mid: 1): 0
Test Case 2:
该测试只是验证能打开的最大连接数,意义不太大
max files:
ulimit -n 15000
[root@wwwu test]# netstat -na|grep ESTAB|grep 1883|wc -l
15088
cpu 2 core 只使用100%(total 200%),目前没有publish数据,所以memory使用量很少3%,
还可以继续提升
vmstat
[root@wwwu test]# vmstat 3
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 24080 1865596 233880 266228 1 3 2 4 2 11 0 0 99 0 0
1 0 24080 1865588 233880 266228 0 0 0 0 1030 30 17 33 50 0 0
[root@wwwu test]# top
top - 03:17:46 up 13 days, 10 min, 4 users, load average: 1.09, 0.83, 0.37
Tasks: 164 total, 2 running, 162 sleeping, 0 stopped, 0 zombie
Cpu0 : 31.2%us, 68.8%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 0.0%us, 0.3%sy, 0.0%ni, 99.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 1091948k used, 1865588k free, 233880k buffers
Swap: 3096568k total, 24080k used, 3072488k free, 266228k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6709 root 20 0 150m 113m 784 R 100.0 3.9 15:16.63 mosquitto
发现问题
no route to host;
telnet: Unable to connect to remote host: No route to host
觉得甚是差异,估计是虚拟机装了有问题,就把虚拟机中的防火墙给清了一下,发现可行。
[zhoulei@localhost ~]$ sudo iptables -F
mosquitto无法启动
发现netstat -an|grep 1883 没有发现记录,发现mosquitto.db文件很大445M,把该文件删除后,
可以正常启动
2014/05/07 结果
------------------------------------------------------------------------------------------------
【subscriber】
开6000个subscribe active 连接,脚本如下
java -Xms150M -Xmx350M -Djava.ext.dirs=./ -cp /usr/Paul MQTT31PerfHarness -su -nt 6000 -ss 10 -sc BasicStats -rl 0 -wp true -wc 50 -wi 60 -wt 90 -id 1 -qos 2 -ka 600 -cs false -tc mqtt.Subscriber -d TOPICSINGEL -db 1 -dx 20000 -dn 1 -iu tcp://9.119.154.107:1883
kswapd0 进程跳来跳去
top - 13:16:13 up 31 min, 4 users, load average: 617.83, 187.04, 65.29
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.9%us, 58.7%sy, 0.0%ni, 0.0%id, 34.5%wa, 0.0%hi, 3.9%si, 0.0%st
Mem: 2957536k total, 2899268k used, 58268k free, 984k buffers
Swap: 3096568k total, 805564k used, 2291004k free, 14656k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 2538 root 20 0 25.2g 1.7g 4420 S 122.1 60.3 5:27.12 java
[root@wwwu ~]# free -m
total used free shared buffers cached
Mem: 2888 2810 77 0 0 15
id=1,rate=996.20,threads=6000
id=1,rate=1007.30,threads=6000
之后司机
publish 只有700M可用内存
[root@oc8050533176 Local]# top
top - 17:13:51 up 3:44, 2 users, load average: 0.00, 0.00, 0.05
Tasks: 124 total, 1 running, 123 sleeping, 0 stopped, 0 zombie
Cpu(s): 14.7%us, 15.2%sy, 0.0%ni, 64.6%id, 0.0%wa, 0.0%hi, 5.4%si, 0.0%st
Mem: 2011348k total, 1736976k used, 274372k free, 23976k buffers
Swap: 8388600k total, 0k used, 8388600k free, 803732k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1946 root 20 0 3362m 591m 7292 S 66.7 30.1 3:22.69 java
最多2500个publish active 连接, throughput 3242 msgs/second,
id=5,rate=3084.80,threads=2000
id=5,rate=2324.60,threads=2000
id=5,rate=3176.30,threads=2000
id=5,rate=3091.40,threads=2000
[mosquitto]
connection 后的top
[root@wwwu mosquitto]# top -b
top - 05:50:55 up 2:00, 4 users, load average: 1.00, 0.81, 0.74
Tasks: 161 total, 2 running, 159 sleeping, 0 stopped, 0 zombie
Cpu(s): 31.9%us, 7.4%sy, 0.0%ni, 54.8%id, 3.2%wa, 0.0%hi, 2.6%si, 0.0%st
Mem: 2957536k total, 2518020k used, 439516k free, 31976k buffers
Swap: 3096568k total, 0k used, 3096568k free, 209064k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 R 77.9 42.5 93:35.86 mosquitto
[root@wwwu ~]# netstat -an|grep ESTABLISH|grep 1883|wc -l
8000
遇到问题
[root@wwwu ~]# netstat -na | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'
CLOSE_WAIT 1
FIN_WAIT1 298
ESTABLISHED 2372
LISTEN 12
tcp 0 21848 9.119.154.107:1883 9.119.154.160:55851(sub) FIN_WAIT1
tcp 0 37393 9.119.154.107:1883 9.119.154.160:57275 FIN_WAIT1
tcp 0 0 9.119.154.107:1883 9.119.154.160:56913 FIN_WAIT2
tcp 0 40545 9.119.154.107:1883 9.119.154.160:55864 FIN_WAIT1
netstat显示的连接状态有几种WAIT: FIN_WAIT_1,FIN_WAIT_2,CLOSE_WAIT和TIME_WAIT. 他们的含义要从TCP的连接中断过程说起
Server Client
-------- FIN -------->
<------- ACK ---------
<------- FIN ---------
-------- ACK -------->
假设服务器主动关闭连接(Active Close)
服务器首先向客户机发送FIN包,然后服务器进入FIN_WAIT_1状态。
客户机向服务器确认FIN包收到,向服务器发送FIN/ACK,客户机进入CLOSE_WAIT状态。
服务器收到来自客户机的FIN/ACK后,进入FIN_WAIT_2状态
现在客户机进入被动关闭(“passive close”)状态,客户机操作系统等待他上面的应用程序关闭连接。一旦连接被关闭,客户端会发送FIN包到服务器
当服务器收到FIN包后,服务器会向客户机发送FIN/ACK确认,然后进入著名的TIME_WAIT状态
由于在连接关闭后,还不能确定所有连接关闭前的包都被服务器接受到了(包的接受是没有先后顺序的),因此有了TIME_WAIT状态。在这个状态中,服务器仍然在等待客户机发送的但是还未到达服务器的包。这个状态将保持2*MSL的时间,这里的MSL指的是一个TCP包在网络中存在的最长时间。一般情况下2*MSL=240秒。
-------------------------------------------------------------------------------------
10000 sub connections
mosquitto 2G 内存 32% CPU
Tasks: 161 total, 1 running, 160 sleeping, 0 stopped, 0 zombie
Cpu0 : 3.7%us, 11.6%sy, 0.0%ni, 84.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 4.3%us, 13.6%sy, 0.0%ni, 82.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 2957536k total, 2202836k used, 754700k free, 34916k buffers
Swap: 3096568k total, 0k used, 3096568k free, 48164k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
2509 root 20 0 1264m 1.2g 740 S 32.6 42.5 148:35.98 mosquitto
-----------------boss worker multithread model---------------------------
int queue[QUEUE_SIZE];
This is the main thread. It creates a queue struct (defined elsewhere) with methods enqueue,
dequeue, empty, etc. When the server accepts a connection,
it enqueues the socket that the incoming connection is on.
The worker threads which were dispatched at the beginning are constantly checking this queue to
see if any jobs have been added, and if there are jobs, then they dequeue the socket, connect to that port,
and read/parse/write the incoming http request.
int main(int argc, char* argv[])
{
int hSocket, hServerSocket;
struct hostent* pHostInfo;
struct sockaddr_in Address;
int nAddressSize = sizeof(struct sockaddr_in);
int nHostPort;
int numThreads;
int i;
init(&head,&tail);
/
hServerSocket=socket(AF_INET,SOCK_STREAM,0);
if(hServerSocket == SOCKET_ERROR)
{
printf("\nCould not make a socket\n");
return 0;
}
Address.sin_addr.s_addr = INADDR_ANY;
Address.sin_port = htons(nHostPort);
Address.sin_family = AF_INET;
printf("\nBinding to port %d\n",nHostPort);
if(bind(hServerSocket,(struct sockaddr*)&Address,sizeof(Address)) == SOCKET_ERROR) {
printf("\nCould not connect to host\n");
return 0;
}
getsockname(hServerSocket, (struct sockaddr *) &Address,(socklen_t *)&nAddressSize);
printf("Opened socket as fd (%d) on port (%d) for stream i/o\n",hServerSocket, ntohs(Address.sin_port));
printf("Server\n\
sin_family = %d\n\
sin_addr.s_addr = %d\n\
sin_port = %d\n"
, Address.sin_family
, Address.sin_addr.s_addr
, ntohs(Address.sin_port)
);
//Up to this point is boring server set up stuff. I need help below this.
/
if(listen(hServerSocket,QUEUE_SIZE) == SOCKET_ERROR) {
printf("\nCould not listen\n");
return 0;
}
while(1) {
pthread_mutex_lock(&mtx);
printf("\nWaiting for a connection");
while(!empty(head,tail)) {
pthread_cond_wait (&cond2, &mtx);
}
hSocket = accept(hServerSocket,(struct sockaddr*)&Address,(socklen_t *)&nAddressSize);
printf("\nGot a connection");
enqueue(queue,&tail,hSocket);
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond); // wake worker thread
}
}
void *worker(void *threadarg) {
while(true)
{
pthread_mutex_lock(&mtx);
while(empty(head,tail)) {
pthread_cond_wait(&cond, &mtx);
}
int hSocket = dequeue(queue,&head);
unsigned nSendAmount, nRecvAmount;
char line[BUFFER_SIZE];
nRecvAmount = read(hSocket,line,sizeof line);
printf("\nReceived %s from client\n",line);
/
if(close(hSocket) == SOCKET_ERROR) {
printf("\nCould not close socket\n");
return 0;
}
pthread_mutex_unlock(&mtx);
pthread_cond_signal(&cond);
}
epoll与select、poll区别
1、相比于select与poll,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。内核中的select与poll的实现是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
2、epoll的实现是基于回调的,如果fd有期望的事件发生就通过回调函数将其加入epoll就绪队列中,也就是说它只关心“活跃”的fd,与fd数目无关。
3、内核 / 用户空间 内存拷贝问题,如何让内核把 fd消息通知给用户空间呢?在这个问题上select/poll采取了内存拷贝方法。而epoll采用了共享内存的方式。
4、epoll不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个fd集合。
epoll 的EPOLLLT (水平触发,默认)和 EPOLLET(边沿触发)模式的区别
1、EPOLLLT:完全靠kernel epoll驱动,应用程序只需要处理从epoll_wait返回的fds,这些fds我们认为它们处于就绪状态。此时epoll可以认为是更快速的poll。
2、EPOLLET:此模式下,系统仅仅通知应用程序哪些fds变成了就绪状态,一旦fd变成就绪状态,epoll将不再关注这个fd的任何状态信息,(从epoll队列移除)直到应用程序通过读写操作(非阻塞)触发EAGAIN状态,epoll认为这个fd又变为空闲状态,那么epoll又重新关注这个fd的状态变化(重新加入epoll队列)。随着epoll_wait的返回,队列中的fds是在减少的,所以在大并发的系统中,EPOLLET更有优势,但是对程序员的要求也更高,因为有可能会出现数据读取不完整的问题,举例如下:
假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发,那么这个fd变成就绪状态就会从epoll 队列移除,很可能epoll_wait 会一直阻塞,忽略尚未读取的1k数据,与此同时对方还在等待着我们发送一个回复ack,表示已经接收到数据;如果是电平触发,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
man epoll example
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
epollfd = epoll_create(10);
if (epollfd == -1) {
perror("epoll_create");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}