基于NIO的消息路由的实现(六)报文队列的处理
转:https://my.oschina.net/u/2397619/blog/497328
一、报文队列的处理:
如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理;具体如下:
1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQUEUE。
public class GVQueue {
//通讯级别报文的队列
public static BlockingQueue CQUEUE = new ArrayBlockingQueue<Packet>(3000);
//短消息级别报文的队列
public static BlockingQueue MQUEUE = new ArrayBlockingQueue<Packet>(3000);
}
2.CQUEUE队列的消费者线程,专门针对通讯层面的消息进行处理,比如:客户端链路维护的回应等;如下:
public class CQueueConsumer extends Thread {
private int waitTime;
private static Logger logger = LogManager.getLogger(CQueueConsumer.class.getName());
public CQueueConsumer(int waitTime) {
this.waitTime = waitTime;
}
public void run() {
logger.info("通讯队列消费者线程启动……");
boolean isRunning = true;
try {
while (isRunning) {
IPacket packet = (IPacket) GVQueue.CQUEUE.poll(1, TimeUnit.SECONDS);
if (packet != null) {
handleQueue(packet);
/* if (logger.isDebugEnabled()) {
logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
}*/
logger.debug("读出消息队列收到的客户端消息:" + packet.getPacketStr());
} else {
Thread.sleep(waitTime);
if (logger.isDebugEnabled()) {
logger.debug("消息队列中没有消息,休息一会儿……");
}
}
}
} catch (InterruptedException e) {
logger.info("通讯队列消费者处理线程终止……");
e.printStackTrace();
}
}
/**
* 通讯层处理(对除了M报文之外的报文进行处理)
* @param packet
*/
private void handleQueue(IPacket packet) {
//如果是短消息类报文,则直接放入短消息队列等待短消息消费者处理;
if (packet.getHeader().equals(MsgPacket.HEADER)){
GVQueue.MQUEUE.offer(packet);
}
if (!packet.getHeader().equals(ReplyPacket.HEADER)) {
//需要更新通道的最后访问时间
GVConnection gvConn = GVConnTools.getConnByToken(packet.getClientToken());
if (gvConn!=null){
//更改最后访问时间
GVConnTools.updLastAccessTime(packet.getClientToken(),CommonTools.systemTimeUtc());
SocketChannel socketChannel = gvConn.getChannel();
//对客户端的报文做出R相应
if (socketChannel != null) {
ReplyOrder replyOrder = (ReplyOrder)Config.getOrderInstance(ReplyOrder.HEADER);
replyOrder.initReplyOrder(packet.getRid());
GVServer.write2Client(replyOrder, socketChannel);
}
}
}
}
}
3.而MQUEUE队列的消费者线程,则专门针对M类报文进行处理,它的工作是拿出M报文,找到目标通道,然后将报文内容转入目标通道(目前离线存储尚未实现)。如下:
public class MQueueConsumer extends Thread {
private static Logger logger = LogManager.getLogger(MQueueConsumer.class.getName());
public void run() {
logger.info("短消息队列消费者线程启动……");
while (true) {
try {
Packet packet = (Packet) GVQueue.MQUEUE.poll(1, TimeUnit.SECONDS);
if (packet != null) {
// Logs.info("读出消息队列收到的客户端消息:" + packet.getPacketStr());
MsgInfo msgInfo = new MsgInfo();
msgInfo = msgInfo.generaterMsgInfo(packet.getPacketBody());
SocketChannel channel = GVConnTools.getChannelByUserId(msgInfo.getReceiver());
if(channel!=null && channel.isOpen()) {
MsgOrder msgOrder = (MsgOrder) Config.getOrderInstance(MsgOrder.HEADER);
msgOrder.initMsgOrder(packet.getPacketBody());
GVServer.write2Client(msgOrder, channel);
if (logger.isDebugEnabled()) {
logger.debug("短消息发送至:<" + msgInfo.getReceiver() + ">");
}
}else{
/*
此处将数据放入离线存储队列
*/
if(logger.isDebugEnabled()) {
logger.debug("短消息放入离线短消息队列:<" + msgInfo.getReceiver() + ">");
}
}
} else {
Thread.sleep(200);
if(logger.isDebugEnabled()) {
logger.debug("消息队列中没有消息,休息一会儿……");
}
}
} catch (InterruptedException e) {
logger.info("短消息队列消费者处理线程终止……");
e.printStackTrace();
}
}
}
}