mina服务管理器
package com.zxtx.apps.traffic.server;
import org.apache.commons.lang.StringUtils;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.zxtx.apps.traffic.client.Configure;
import com.zxtx.apps.traffic.client.MessageDecoder;
import com.zxtx.apps.traffic.client.MessageEncoder;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* mina服务管理器
* Created by Tonny on 2016/8/1.
*/
public class MinaServerHelper {
/**
* 输出缓冲区的大小
*/
public static final int SENDBUFFERSIZE = 1024;
/**
* 输入缓冲区的大小
*/
public static final int RECEIVEBUFFERSIZE = 1024;
/**
* mina 服务
*/
private NioSocketAcceptor acceptor;
/**
* 消息处理器
*/
private MinaServerHandler minaServerHandler;
private Configure configure;
public void start() throws IOException {
// 初始化Acceptor
acceptor = new NioSocketAcceptor();
// 建立线程池
acceptor.getFilterChain().addLast("exector",
new ExecutorFilter(configure.getMinThreads(), configure.getMaxThreads()));
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MessageEncoder(), new MessageDecoder()));
// acceptor.getFilterChain().addLast("myChain",
// new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName(configure.getCharset()))));
// 添加日志
LoggingFilter logfilter = new LoggingFilter();
acceptor.getFilterChain().addLast("logger", logfilter);
// 设置的是主服务监听的端口可以重用
acceptor.setReuseAddress(true);
// 设置每一个非主监听连接的端口可以重用
acceptor.getSessionConfig().setReaderIdleTime(30);
acceptor.getSessionConfig().setReuseAddress(true);
// 设置输入缓冲区的大小
acceptor.getSessionConfig().setReceiveBufferSize(SENDBUFFERSIZE);
// 设置输出缓冲区的大小
acceptor.getSessionConfig().setSendBufferSize(SENDBUFFERSIZE);
// 设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出
acceptor.getSessionConfig().setTcpNoDelay(true);
// 设置主服务监听端口的监听队列的最大值为1500,如果当前已经有1500个连接,再新的连接来将被服务器拒绝
acceptor.setBacklog(configure.getBacklog());
// 指定业务逻辑处理器
acceptor.setHandler(minaServerHandler);
// 设置端口号 ip
if (StringUtils.isEmpty(configure.getHost()) || configure.getHost().equalsIgnoreCase("*")) {
acceptor.setDefaultLocalAddress(new InetSocketAddress(configure.getPort()));
} else {
acceptor.setDefaultLocalAddress(new InetSocketAddress(configure.getHost(), configure.getPort()));
}
// 启动服务
acceptor.bind();
}
public Configure getConfigure() {
return configure;
}
public void setConfigure(Configure configure) {
this.configure = configure;
}
public MinaServerHandler getMinaServerHandler() {
return minaServerHandler;
}
public void setMinaServerHandler(MinaServerHandler minaServerHandler) {
this.minaServerHandler = minaServerHandler;
}
public NioSocketAcceptor getAcceptor() {
return acceptor;
}
public void setAcceptor(NioSocketAcceptor acceptor) {
this.acceptor = acceptor;
}
}
启动服务器
package com.zxtx.apps.traffic;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.mina.core.session.IoSession;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
import com.zxtx.apps.traffic.server.MinaServerHelper;
/**
* Created by Tonny on 2016/8/3.
*/
@ContextConfiguration(locations = {"classpath*:applicationContext-basic.xml",
"classpath*:config/applicationContext-mysql-*.xml","classpath*:config/applicationContext-plugin-mybatis-mysql.xml", "classpath*:/config/applicationContext-mod-*.xml"})
public class Server extends AbstractJUnit4SpringContextTests{
@Autowired
private MinaServerHelper minaServerHelper;
@Test
public void start() throws InterruptedException {
try {
minaServerHelper.start();
} catch (IOException e) {
e.printStackTrace();
}
Thread.sleep(1200000L);
}
public MinaServerHelper getMinaServerHelper() {
return minaServerHelper;
}
public void setMinaServerHelper(MinaServerHelper minaServerHelper) {
this.minaServerHelper = minaServerHelper;
}
/**
* 向每个客户端发送消息
*/
public void sendConMessage(DefaultMessage message){
Map<Long,IoSession> conMap = minaServerHelper.getAcceptor().getManagedSessions();
Iterator<Long> iter = conMap.keySet().iterator();
while (iter.hasNext()) {
Object key = iter.next();
IoSession session = (IoSession)conMap.get(key);
session.write(message);
}
}
}
服务端业务处理逻辑:
package com.zxtx.apps.traffic.server;
import java.util.List;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.llama.library.log.LogFactory;
import org.llama.library.log.Logger;
import com.zxtx.apps.traffic.DangerVehicle;
import com.zxtx.apps.traffic.DangerVehicleService;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
import com.zxtx.apps.traffic.vehicle.Device;
import com.zxtx.apps.traffic.vehicle.DeviceQuery;
import com.zxtx.apps.traffic.vehicle.DeviceService;
import com.zxtx.apps.traffic.vehicle.OnlineVehicles;
import com.zxtx.apps.traffic.vehicle.VehicleOnlineQuery;
import com.zxtx.apps.traffic.vehicle.VehicleOnlineService;
/**
* Created by Tonny on 2016/8/1.
*/
public class MinaServerHandler extends ServerHandlerAdapter {
/**
* 日志
*/
private Logger log = LogFactory.getLogger(MinaServerHandler.class);
/**
* 会话属性
*/
private final AttributeKey context = new AttributeKey(getClass(), "context");
private DeviceService deviceService;
private DangerVehicleService dangerVehicleService;
private VehicleOnlineService vehicleOnlineService;
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
log.debug(session + " created");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
pushMessage(session);
log.debug(session + " opened");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
String token = (String) session.getAttribute("token");
DeviceQuery query = deviceService.createQuery();
query.sign(token);
List<Device> list = query.list();
if(list.size()>0){
Device device = list.get(0);
DangerVehicle vehicle = dangerVehicleService.findInfoByPlateNum(device.getPlateNo());
vehicleOnlineService.offline(device, vehicle);
}
super.sessionClosed(session);
log.debug(session + " closed");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
log.debug(session + " idled");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
log.debug(session + " exception");
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
log.debug(session + " received " + message);
DefaultMessage defaultMessage = (DefaultMessage) message;
if(handlerMap.get(defaultMessage.getMsgId())!=null){
//处理请求并发送响应数据
dealMessage(session, message);
}
log.debug(session + " send " );
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
log.debug(session + " sended " + message);
}
public DeviceService getDeviceService() {
return deviceService;
}
public void setDeviceService(DeviceService deviceService) {
this.deviceService = deviceService;
}
public DangerVehicleService getDangerVehicleService() {
return dangerVehicleService;
}
public void setDangerVehicleService(DangerVehicleService dangerVehicleService) {
this.dangerVehicleService = dangerVehicleService;
}
public VehicleOnlineService getVehicleOnlineService() {
return vehicleOnlineService;
}
public void setVehicleOnlineService(VehicleOnlineService vehicleOnlineService) {
this.vehicleOnlineService = vehicleOnlineService;
}
}
业务核心实现类
package com.zxtx.apps.traffic.server;
import java.util.Map;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import com.zxtx.apps.traffic.client.MessageBody;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
import com.zxtx.apps.traffic.server.handler.ClientUpgradeHandler;
import com.zxtx.apps.traffic.server.handler.Handler;
/**
*
* @author zhuwei
*
*/
public class ServerHandlerAdapter extends IoHandlerAdapter{
/**
* 业务Map
*/
protected Map<Integer,Handler> handlerMap;
/**
* 服务端处理信息,并返回响应
* @param session
* @param message
*/
protected void dealMessage(IoSession session, Object message){
if(!(message instanceof DefaultMessage)){
return ;
}
DefaultMessage defaultMessage = (DefaultMessage)message;
//根据不同的业务调用不同的handler类实现
//获取新的消息
MessageBody body = handlerMap.get(defaultMessage.getMsgId()).dealMessage(session,defaultMessage);
//处理新的消息
DefaultMessage serverMessage = new DefaultMessage(body);
serverMessage.setClientId(defaultMessage.getClientId());
serverMessage.setMobile("13900139000");
//发送消息
session.write(serverMessage);
}
/**
* 推送消息
* @param session
* @param message
*/
protected void pushMessage(IoSession session){
// ClientUpgradeHandler handler = new ClientUpgradeHandler();
// session.write(handler.pushMessage(session));
}
public Map<Integer, Handler> getHandlerMap() {
return handlerMap;
}
public void setHandlerMap(Map<Integer, Handler> handlerMap) {
this.handlerMap = handlerMap;
}
}
业务核心接口
package com.zxtx.apps.traffic.server.handler;
import org.apache.mina.core.session.IoSession;
import com.zxtx.apps.traffic.client.MessageBody;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
public interface Handler {
/**
* 处理消息
* @param session
* @param message
* @return
*/
MessageBody dealMessage(IoSession session,DefaultMessage message);
/**
* 推送消息
* @param session
* @return
*/
DefaultMessage pushMessage(IoSession session);
}
实际业务实现,例如登陆业务处理
package com.zxtx.apps.traffic.server.handler;
import org.apache.mina.core.session.IoSession;
import com.zxtx.apps.traffic.DangerVehicle;
import com.zxtx.apps.traffic.DangerVehicleService;
import com.zxtx.apps.traffic.client.MessageBody;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
import com.zxtx.apps.traffic.client.message.LoginRequestBody;
import com.zxtx.apps.traffic.client.message.ServerCommonResponseBody;
import com.zxtx.apps.traffic.vehicle.Device;
import com.zxtx.apps.traffic.vehicle.DeviceQuery;
import com.zxtx.apps.traffic.vehicle.DeviceService;
import com.zxtx.apps.traffic.vehicle.VehicleOnlineService;
/**
* 登录业务
*
* @author zhuwei
*/
public class LoginHandler implements Handler {
/**
* 登录核心业务逻辑
*/
@Override
public MessageBody dealMessage(IoSession session, DefaultMessage message) {
ServerCommonResponseBody response = new ServerCommonResponseBody();
//设置流水号
response.setTransaction(message.getSn() == 32767 ? 1 : message.getSn() + 1);
....
return response;
}
}
客户端实现:
package com.zxtx.apps.traffic;
import java.net.InetSocketAddress;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.junit.Test;
import org.llama.library.log.LogFactory;
import org.llama.library.log.Logger;
import com.zxtx.apps.traffic.client.MessageDecoder;
import com.zxtx.apps.traffic.client.MessageEncoder;
import com.zxtx.apps.traffic.client.MinaClientHanlder;
import com.zxtx.apps.traffic.server.MinaServerHandler;
/**
* 测试客户端
* @author zhuwei
*
*/
public class Client {
private static final int bindPort=8856;
/**
* 日志
*/
private Logger log = LogFactory.getLogger(MinaServerHandler.class);
@Test
public void client() {
// 创建一个socket连接
final NioSocketConnector connector=new NioSocketConnector();
// 获取过滤器链
DefaultIoFilterChainBuilder chain=connector.getFilterChain();
ProtocolCodecFilter filter= new ProtocolCodecFilter(new MessageEncoder(), new MessageDecoder());
// 添加编码过滤器 处理乱码、编码问题
chain.addLast("codec",filter);
chain.addFirst("reconnection", new IoFilterAdapter() {
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
while(true){
try{
Thread.sleep(3000);
ConnectFuture future = connector.connect();
future.awaitUninterruptibly();// 等待连接创建成功
ioSession = future.getSession();// 获取会话
if(ioSession.isConnected()){
log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
break;
}
}catch(Exception ex){
log.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
}
}
}
});
// 消息核心处理器
MinaClientHanlder mainClientHanlder = new LoginClienTest();//登录测试类,根据不同的测试需求进行更换
mainClientHanlder.init();
connector.setHandler(mainClientHanlder);
// 设置链接超时时间
connector.setConnectTimeoutCheckInterval(30);
// 连接服务器,知道端口、地址
ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1",bindPort));
// 等待连接创建完成
cf.awaitUninterruptibly();
cf.getSession().getCloseFuture().awaitUninterruptibly();
connector.dispose();
}
}
客户端Handler
package com.zxtx.apps.traffic.client;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.llama.library.log.LogFactory;
import org.llama.library.log.Logger;
import com.zxtx.apps.traffic.client.message.ClientCommonResponseBody;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
public abstract class MinaClientHanlder extends IoHandlerAdapter{
private Logger log = LogFactory.getLogger(MinaClientHanlder.class);
protected DefaultMessage message;
protected DefaultMessage sendMessage;
public abstract void init();
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
log.debug(session + " created");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
if(message!=null){
session.write(message);
}
log.debug(session + " send");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
log.debug(session + " closed");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
log.debug(session + " idled");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
log.debug(session + " exception");
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
log.debug(session + " 客户端received " + message);
if(sendMessage!=null){
this.message = (DefaultMessage)message;
// ClientCommonResponseBody response = (ClientCommonResponseBody) sendMessage.getBody();
// //设置流水号
// response.setTransaction(this.message.getSn()==32767?1:this.message.getSn()+1);
// response.setAnswerId(this.message.getMsgId());
session.write(sendMessage);
log.debug(session + " 客户端sended " + sendMessage);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
log.debug(session + " sended " + message);
}
}
登陆测试请求类
package com.zxtx.apps.traffic;
import com.zxtx.apps.traffic.client.MinaClientHanlder;
import com.zxtx.apps.traffic.client.message.DefaultMessage;
import com.zxtx.apps.traffic.client.message.LoginRequestBody;
/**
* 登录测试
* @author zhuwei
*
*/
public class LoginClienTest extends MinaClientHanlder{
@Override
public void init() {
// TODO Auto-generated method stub
//发送登录请求
LoginRequestBody request = new LoginRequestBody();
request.setToken("2EE80102DD453AFA7AE6FBACFE5312F1");
message = new DefaultMessage(request);
message.setClientId("KGHR");
message.setMobile("13900139000");
}
}