用socketchannel实现多客户端与服务器端的通信

时间:2022-09-05 15:21:18
用socket实现两方通信是很简单的,多方通信的话,java提供NIO非阻塞技术来解决这个问题。

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。

Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

自己写了一个代码,怎么也调试不出来,参考了学长的代码(封装的这么漂亮,完全不是一个档次……),不过唯一有点缺陷是改成客户端程序的时候参数的传递好像总是不尽如人意,所以代码还是在控制台输入输出的。

serverSelector 与特定协议间的通信的接口
import java.io.IOException;
import java.nio.channels.*;
/**
 * serverSelector 与特定协议间的通信的借口
 * @author Qing
 *
 */
public interface Protocol {
	/**
	 * 接收socketchannel
	 * @param key
	 * @throws IOException
	 */
	void handleAccept(SelectionKey key)throws IOException;
	/**
	 * 从一个socketchannel读信息
	 * @param key
	 * @throws IOException
	 */
	void handleRead(SelectionKey key)throws IOException;
	/**
	 * 向socketchannel写入信息
	 * @param key
	 * @throws IOException
	 */
	void handleWrite(SelectionKey key)throws IOException;

}


服务器端接口实现类
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
 * 服务器端接口实现类
 * @author Administrator
 *
 */
public class ProtocolImplement implements Protocol{
	private int buffersize;
	
	public ProtocolImplement(int buffersize){
		this.buffersize = buffersize;
	}
	
	public void handleAccept(SelectionKey key)throws IOException{
		SocketChannel clientchannel = ((ServerSocketChannel)key.channel()).accept();
		clientchannel.configureBlocking(false);
		clientchannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(buffersize));
	}
	
	public void handleRead(SelectionKey key)throws IOException{
		SocketChannel clientchannel = (SocketChannel)key.channel();
		//得到并清空缓冲区
		ByteBuffer buffer  = (ByteBuffer)key.attachment();
		buffer.clear();
		//读取信息获得读取的字节数
		long bytesRead = clientchannel.read(buffer);
		
		if(bytesRead == -1){
			//没有读取到内容
			clientchannel.close();
		}else{
			//将缓冲区准备为数据传出状态
			buffer.flip();
			String receivedString = Charset.forName("UTF-16").newDecoder().decode(buffer).toString();
			//打印
			System.out.println("客户端"+clientchannel.socket().getRemoteSocketAddress()+":"+receivedString);
			//为下一次读取或写入做准备
			key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
		}
	}
	
	public void handleWrite(SelectionKey key)throws IOException{
		SocketChannel clientchannel = (SocketChannel)key.channel();
		ByteBuffer writeBuffer = ByteBuffer.wrap(Server.msg.getBytes("UTF-16"));
		clientchannel.write(writeBuffer);
		key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
	}


}

服务器端写线程
import java.util.Scanner;
/**
 * 
 * @author Administrator
 *
 */
public class ServerWriteThread implements Runnable{
	/**
	 * 服务器端写线程
	 */
	public ServerWriteThread(){
		new Thread(this).start();
	}
	
	public void run(){
		Scanner s= new Scanner(System.in);
		while(true){
			System.out.println("输入:");
			Server.msg = s.next();
			Server.signal = true;
		}
	}

}

服务器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

import socket.ServerWriteThread;
/**
 * 服务器端
 * @author Administrator
 *
 */
public class Server {
	
	public static String msg = "1";
	public static boolean signal = false;
	//超时时间
	private static final int TimeOut = 3000;
	//缓冲区大小
	private static final int bufferSize = 1024;
	//监听端口
	private static final int Port = 10005;
	
	public static void main(String[] args)throws IOException,InterruptedException{
		//创建选择器
		Selector selector = Selector.open();
		//打开监听信道
		ServerSocketChannel serverchannel = ServerSocketChannel.open();
		//与本地端口绑定
		serverchannel.socket().bind(new InetSocketAddress(Port));
		//设置为非阻塞模式
		serverchannel.configureBlocking(false);
		//将选择器绑定到监听信道
		serverchannel.register(selector, SelectionKey.OP_ACCEPT);
		ProtocolImplement protocol = new ProtocolImplement(bufferSize);
		System.out.println("服务器开启");
		new ServerWriteThread();
		
		while(true){
			//等待某信道就绪或超时
			if(selector.select(TimeOut) == 0){
			continue;
			}
			//取得迭代器,selectedkeys中包含了每个准备好I/O操作的信道的selectionkey
			IteratorkeyIter = selector.selectedKeys().iterator();
			while(keyIter.hasNext()){
				SelectionKey key = keyIter.next();
				try{
					if(key.isAcceptable()){
						protocol.handleAccept(key);
					}
					if(key.isReadable()){
						protocol.handleRead(key);
					}
					if(key.isValid() && key.isWritable()){
						if(signal == true){
							protocol.handleWrite(key);
						}
					}
				}catch(IOException ex){
					
					keyIter.remove();
					continue;
				}
				//移除处理过的键
				keyIter.remove();
			}
			signal = false;
		}
	}

}

客户端线程
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
 * 客户端写线程
 * @author Administrator
 *
 */
public class ClientThread implements Runnable{
	
	private Selector selector;
	/**
	 * 客户端写进程
	 * @param selector
	 */
	public ClientThread(Selector selector){
		this.selector = selector;
		new Thread(this).start();
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			while(selector.select()>0){
				for(SelectionKey key:selector.selectedKeys()){
					if(key.isReadable()){
						//使用NIO嘟嘟channel中的数据
						SocketChannel clientchannel = (SocketChannel)key.channel();
						ByteBuffer buffer = ByteBuffer.allocate(1024);
						clientchannel.read(buffer);
						buffer.flip();
						//将字节转化为UTF-16的字符串
						String receivedString = Charset.forName("UTF-16").newDecoder().decode(buffer).toString();
						System.out.println("服务器"+clientchannel.socket().getRemoteSocketAddress()+":"+receivedString);
						//为下次读取准备
						key.interestOps(SelectionKey.OP_READ);
					}
					//删除正在处理的selectionkey
					selector.selectedKeys().remove(key);
				}
				
			}
		}catch(IOException ex){
			ex.printStackTrace();
		}
	}
	

}

客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

/**
 * 客户端
 * @author Administrator
 *
 */
public class Client {
	private Selector selector;
	SocketChannel socketchannel;
	private String hostIp;
	private int port;
	/**
	 * 构造函数
	 * @param hostIp
	 * @param port
	 * @throws IOException
	 */
	public Client (String hostIp,int port)throws IOException{
		this.hostIp = hostIp;
		this.port = port;
		
		initialize();
	}
	/**
	 * 初始化
	 * @throws IOException
	 */
	public void initialize()throws IOException{
		//打开监听信道并设置为非阻塞模式
		socketchannel = SocketChannel.open(new InetSocketAddress(hostIp,port));
		socketchannel.configureBlocking(false);
		//打开并注册到信道
		selector = Selector.open();
		socketchannel.register(selector, SelectionKey.OP_READ);
		//启动读线程
		new ClientThread(selector);
	}
	
	public void sendMsg(String message)throws IOException{
		ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes("UTF-16"));
		socketchannel.write(writeBuffer);
	}
	public static void main(String[] args)throws IOException{
		Client client = new Client("127.0.0.1",10005);
		Scanner read = new Scanner(System.in);
		while(true){
			System.out.println("本地:");
			String msg = read.next();
			client.sendMsg(msg);
		}
	
	}
}