本文使用java的NIO简单实现server-client模式,处理异步队列。
缓存队列类
public class Buffer {
private static Queue<Object> queue = new LinkedList<Object>();
private static int INITSIZE = 2;
private Lock mutex;
private Condition condition;
private Buffer(){
mutex = new ReentrantLock();
condition = ();
}
public static Buffer getIntance(){
return ;
}
static class QueueBuffer{
private static Buffer instance = new Buffer();
}
public void setInitSize(int size){
INITSIZE = size;
}
public void produce(String msg){
mutex.lock();
try {
while(() >= INITSIZE ){
System.out.println("queue wait to consume");
condition.await();
}
(msg);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
();
} finally {
();
();
}
}
public Object consume(){
mutex.lock();
try {
while (() == 0) {
System.out.println("queue wait to produce");
condition.await();
}
return ();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
();
return null;
} finally {
();
();
}
}
public int getQueueSize(){
return ();
}
}
任务类
public class Task implements Runnable{
private static ConcurrentLinkedQueue<SelectionKey> clq = new ConcurrentLinkedQueue<>();
private NIOServer server;
public Task(NIOServer server){
this.server = server;
}
public static void push(SelectionKey key){
(key);
}
@Override
public void run() {
try {
SelectionKey key = ();
(key);
} catch (IOException e) {
// TODO Auto-generated catch block
();
}
}
}
服务端类
public class NIOServer {
private ServerSocketChannel serverSocketChannel;
int port = 8080;
private Selector selector;
ByteBuffer recBuffer = (1024);
ByteBuffer sendBuffer = (1024);
Map<SelectionKey, String> clientMsgs = new HashMap<>();
private static int client_no = 0;
ExecutorService serviceExecutor = (10);
Task task = null;
public NIOServer(int port) throws IOException {
this.port = port;
serverSocketChannel = ();
().bind(new InetSocketAddress(this.port));
(false);
selector = ();
(selector, SelectionKey.OP_ACCEPT);
System.out.println("init finish");
task = new Task(this);
}
public void listen() throws IOException {
while (true) {
System.out.println("server scanning");
int event = selector.select();
if (event > 0) {
Set<SelectionKey> keys = ();
Iterator<SelectionKey> iterator = ();
while (()) {
SelectionKey key = ();
();
process(key);
}
}
}
}
private void process(SelectionKey selectionKey) {
SocketChannel client = null;
try {
if (() && ()) {
client = ();
++client_no;
(false);
(selector, SelectionKey.OP_READ);
} else if (() && ()) {
readHandle(selectionKey);
} else if (() && ()) {
if ((selectionKey)) {
writeMsgHanle(selectionKey);
}
}
} catch (Exception e) {
();
try {
().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
();
}
}
}
private void readHandle(SelectionKey selectionKey) throws IOException, InterruptedException {
SocketChannel client;
client = (SocketChannel) ();
int len = 0;
while ((len = (recBuffer)) > 0) {
();
String sb = new String((), 0, len);
(selectionKey, ());
System.out.println("Thread:" + ().getId() + ",NO." + client_no + " client send msg:"
+ ());
();
}
(selectionKey);
(task);
}
public void msgHandle(SelectionKey key) throws IOException {
String msg = clientMsgs.get(key);
String serverMsg = null;
if (msg != null && !"".equals(msg)) {
if (("add")) {
msg = (("add") + 4);
().produce(msg);
serverMsg = "server:add " + msg + " to queue successfully";
} else if (("poll")) {
String consumeMsg = (String) ().consume();
serverMsg = "server:remove " + consumeMsg + " from queue successfully";
} else if (("size")) {
serverMsg = "server:size is " + ().getQueueSize();
} else {
serverMsg = "server:no such command";
}
} else {
serverMsg = "server:blank message";
}
(key,serverMsg);
writeMsgHanle(key);
}
private void writeMsgHanle(SelectionKey key) throws IOException {
String msg = clientMsgs.get(key);
SocketChannel client;
();
(());
client = (SocketChannel) ();
();// write in range of text length
while (()) {
(sendBuffer);
}
(SelectionKey.OP_READ);
}
public static void main(String[] args) throws IOException {
new NIOServer(8080).listen();
}
}
客户端类
public class NIOClient {
private SocketChannel client;
private Selector selector;
private ByteBuffer sendBuffer = (1024);
private ByteBuffer receiveBuffer = (1024);
public NIOClient(String ip, int port) throws IOException{
client = ();
selector = ();
(false);
(selector, SelectionKey.OP_READ);
(new InetSocketAddress(ip, port));
while (!()) {
System.out.println("finish connect");
}
}
public void run(){
while(true){
try {
while (!writeHandle()) {
System.out.println("please input a not empty command");
}
int event = selector.select();
if (event > 0) {
Set<SelectionKey> keys = ();
Iterator<SelectionKey> iterator = ();
while (()) {
SelectionKey key = ();
if (() && ()) {
readHandle(key);
}
();
}
}
} catch (Exception e) {
();
if (()) {
try {
();
} catch (IOException e1) {
();
}
}
}
}
}
private void readHandle(SelectionKey key) throws IOException {
SocketChannel scChannel = null;
scChannel = (SocketChannel) ();
();
int len = 0;
if ((len = (receiveBuffer)) > 0) {
();
String msg = new String((),0,len);
System.out.println(msg);
}
}
public boolean writeHandle() throws IOException{
Scanner scanner = new Scanner(System.in);
String line = ();
if (line != null && () > 0) {
();
(());
();
while (()) {
(sendBuffer);
}
return true;
}else{
return false;
}
}
public static void main(String[] args) throws IOException {
new NIOClient("127.0.0.1", 8080).run();
}
}