java socket 多线程通讯 使用mina作为服务端

时间:2022-04-01 22:16:00

客户端代码不变,参照 http://www.cnblogs.com/Westfalen/p/6251473.html

服务端代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 * 使用mina作为服务器进行socket通讯
 * 
 * 需要jar包: mina-core-2.0.16.jar, slf4j-api-1.7.21.jar
 * jar包下载地址 : http://mina.apache.org/downloads-mina.html 
 *
 */
public class MinaServer {
    public static void main(String[] args) {
        try {
            // 4步操作
            // 1.新建 NioSocketAcceptor 事例对象
            NioSocketAcceptor acceptor = new NioSocketAcceptor();
            // 2.设置消息处理对象
            acceptor.setHandler(new MyServerHandler());
            // 3.设置消息解码器
            acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyTextLineFactory()));// acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);
            // //设置idle时长
            // 4.绑定端口开启服务
            acceptor.bind(new InetSocketAddress(9999));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 专门处理消息的类,实现网络连接和消息处理的解耦
 */
class MyServerHandler extends IoHandlerAdapter {

    /** 异常时候的处理 */
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
    }

    /** 收到消息 */
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        System.out.println("messageReceived " + message);
        // session.write("reply " + message); //收到消息就马上把消息回送给客户端
    }

    /** 发送消息 */
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println("messageSent " + message);
    }

    /** 通讯闲置时候的处理 */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        System.out.println("sessionIdle");
    }
}

 

package de.bvb.server;

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

public class MyTextLineFactory implements ProtocolCodecFactory {
    private ProtocolEncoder encoder;
    private ProtocolDecoder decoder;

    public MyTextLineFactory() {
        encoder = new MyTextLineEncoder();
        decoder = new MyCumulativeProtocolDecoder();
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }
}

class MyTextLineEncoder implements ProtocolEncoder {

    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        // 编码格式
        CharsetEncoder charsetEncoder = (CharsetEncoder) session.getAttribute("encoder");
        if (charsetEncoder == null) {
            charsetEncoder = Charset.defaultCharset().newEncoder();
            session.setAttribute("encoder", charsetEncoder);
        }
        String value = (message == null ? "" : message.toString());
        IoBuffer buffer = IoBuffer.allocate(value.length()).setAutoExpand(true);
        buffer.putString(value, charsetEncoder);
        buffer.flip();
        out.write(buffer);
    }

    @Override
    public void dispose(IoSession session) throws Exception {

    }
}

class MyCumulativeProtocolDecoder extends CumulativeProtocolDecoder {

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        int startPosition = in.position();
        while (in.hasRemaining()) {
            byte b = in.get();
            if (b == '\n') {
                int currentPosition = in.position();
                int limit = in.limit();
                in.position(startPosition);
                in.limit(currentPosition);
                IoBuffer buffer = in.slice();
                byte[] dest = new byte[buffer.limit()];
                buffer.get(dest);
                String str = new String(dest);
                out.write(str);
                in.position(currentPosition);
                in.limit(limit);
                return true;
            }
        }
        in.position(startPosition);
        return false;
    }

}