Java AIO 异步IO应用实例

时间:2022-08-27 23:56:27

项目地址:https://github.com/windwant/windwant-demo/tree/master/io-service

Server:

package org.windwant.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom; /**
* AsynchronousServerSocketChannel
*/
public class AIOServer implements Runnable{ private int port = 8889;
private int threadSize = 10;
protected AsynchronousChannelGroup asynchronousChannelGroup; protected AsynchronousServerSocketChannel serverChannel; public AIOServer(int port, int threadSize) {
this.port = port;
this.threadSize = threadSize;
init();
} private void init(){
try {
asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
serverChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
serverChannel.bind(new InetSocketAddress(port));
System.out.println("listening on port: " + port);
} catch (IOException e) {
e.printStackTrace();
}
} public void run() {
try{
if(serverChannel == null) return;
serverChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024); public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
System.out.println("==============================================================");
System.out.println("server process begin ...");
try {
System.out.println("client host: " + result.getRemoteAddress());
echoBuffer.clear();
result.read(echoBuffer).get();
echoBuffer.flip();
System.out.println("received : " + Charset.defaultCharset().decode(echoBuffer)); int random = ThreadLocalRandom.current().nextInt(5);
printProcess(random);
System.out.println("server deal request execute: " + random + "s"); String msg = "server test msg-" + Math.random();
System.out.println("server send data: " + msg);
result.write(ByteBuffer.wrap(msg.getBytes()));
System.out.println("server process end ...");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
attachment.serverChannel.accept(attachment, this);// 监听新的请求,递归调用。
} } public void failed(Throwable exc, AIOServer attachment) {
System.out.println("received failed");
exc.printStackTrace();
attachment.serverChannel.accept(attachment, this);
}
});
System.in.read();
}catch (Exception e){
e.printStackTrace();
}
} private void printProcess(int s) throws InterruptedException {
String dot = "";
for (int i = 0; i < s; i++) {
Thread.sleep(1000);
dot += ".";
System.out.println(dot); }
} public static void main(String[] args) throws IOException {
new Thread(new AIOServer(8989, 19)).start();
}
}

Client:

package org.windwant.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler; /**
* AsynchronousSocketChannel
*/
public class AIOClient implements Runnable{ private AsynchronousSocketChannel client;
private String host;
private int port;
public AIOClient(String host, int port) throws IOException {
this.client = AsynchronousSocketChannel.open();
this.host = host;
this.port = port;
} public static void main(String[] args) {
try {
new Thread(new AIOClient("127.0.0.1", 8989)).start();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} } public void run() {
client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
public void completed(Void result, Object attachment) {
String msg = "client test msg-" + Math.random();
client.write(ByteBuffer.wrap(msg.getBytes()));
System.out.println("client send data:" + msg);
} public void failed(Throwable exc, Object attachment) {
System.out.println("client send field...");
}
}); final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
public void completed(Integer result, Object attachment) {
System.out.println(result);
System.out.println("client read data: " + new String(byteBuffer.array()));
} public void failed(Throwable exc, Object attachment) {
System.out.println("read faield");
}
});
}
}

2017-12-11  改造client: AsynchronousChannelGroup

package org.windwant.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; /**
* AsynchronousSocketChannel
*/
public class AIOClient implements Runnable{ private AsynchronousChannelGroup group; //异步通道组 封装处理异步通道的网络IO操作
private String host;
private int port;
public AIOClient(String host, int port) {
this.host = host;
this.port = port;
initGroup();
} private void initGroup(){
if(group == null) {
try {
group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newFixedThreadPool(5), 5); //使用固定线程池实例化组
} catch (IOException e) {
e.printStackTrace();
}
}
} private void send(){
try {
//异步流式socket通道 open方法创建 并绑定到组 group
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);
//连接
client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
public void completed(Void result, Object attachment) {
String msg = "client test msg-" + Math.random();
client.write(ByteBuffer.wrap(msg.getBytes()));
System.out.println(Thread.currentThread().getName() + " client send data:" + msg); final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
public void completed(Integer result, Object attachment) {
System.out.println(Thread.currentThread().getName() + " client read data: " + new String(byteBuffer.array()));
try {
byteBuffer.clear();
if (client != null) client.close();
} catch (IOException e) {
e.printStackTrace();
}
} public void failed(Throwable exc, Object attachment) {
System.out.println("read faield");
}
});
} public void failed(Throwable exc, Object attachment) {
System.out.println("client send field...");
}
});
} catch (IOException e) {
e.printStackTrace();
}
} public void run() {
for (int i = 0; i < 100; i++) {
send();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} @Override
protected void finalize() throws Throwable {
super.finalize();
group.awaitTermination(10000, TimeUnit.SECONDS);
} public static void main(String[] args) {
try {
new Thread(new AIOClient("127.0.0.1", 8989)).start();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} }
}

Java AIO 异步IO应用实例的更多相关文章

  1. Oracle 之 AIO &lpar;异步io&rpar;

    Linux 异步 I/O (AIO)是 Linux 内核中提供的一个增强的功能.它是Linux 2.6 版本内核的一个标准特性,AIO 背后的基本思想是允许进程发起很多 I/O 操作,而不用阻塞或等 ...

  2. 再谈一次关于Java中的 AIO&lpar;异步IO&rpar; 与 NIO&lpar;非阻塞IO&rpar;

    今天用ab进行压力测试时,无意发现的: Requests per second:    xxx [#/sec] (mean) ab -n 5000 -c 1000 http://www:8080/up ...

  3. Java网络编程和NIO详解5:Java 非阻塞 IO 和异步 IO

    Java网络编程和NIO详解5:Java 非阻塞 IO 和异步 IO Java 非阻塞 IO 和异步 IO 转自https://www.javadoop.com/post/nio-and-aio 本系 ...

  4. Java中的IO、NIO、File、BIO、AIO详解

    java中有几种类型的流?JDK为每种类型的流提供了一些抽象类以供继承,请说出他们分别是哪些类?         Java中的流分为两种,一种是字节流,另一种是字符流,分别由四个抽象类来表示(每种流包 ...

  5. Java 异步 IO

         新的异步功能的关键点,它们是Channel 类的一些子集,Channel 在处理IO操作的时候需要被切换成一个后台进程.一些需要访问较大,耗时的操作,或是其它的类似实例,可以考虑应用此功能. ...

  6. Java知识回顾 (9) 同步、异步IO

    一.基本概念 同步和异步: 同步和异步是针对应用程序和内核的交互而言的. 同步指的是用户进程触发IO 操作并等待或者轮询的去查看IO 操作是否就绪: 而异步是指用户进程触发IO 操作以后便开始做自己的 ...

  7. Oracle在Linux下使用异步IO(aio)配置

    1.首先用root用户安装以下必要的rpm包 # rpm -Uvh libaio-0.3.106-3.2.x86_64.rpm# rpm -Uvh libaio-devel-0.3.106-3.2.x ...

  8. Java网络编程 -- AIO异步网络编程

    AIO中的A即Asynchronous,AIO即异步IO.它是异步非阻塞的,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般我们的业务处理逻辑会变成一个回调函数,等待IO操 ...

  9. 异步IO与回调

    最好了解 Java NIO 中 Buffer.Channel 和 Selector 的基本操作,主要是一些接口操作,比较简单. 本文将介绍非阻塞 IO 和异步 IO,也就是大家耳熟能详的 NIO 和 ...

随机推荐

  1. MySQL:常见使用问题

    内容 1.Linux 上安装 MySQL 2.单机上安装多实例 3.不知root密码情况下,修改root密码 1.Linux 上安装MySQL 安装步骤: 1)解压 tar.gz文件 -linux-g ...

  2. 转载:奇异值分解&lpar;SVD&rpar; --- 线性变换几何意义(下)

    本文转载自他人: PS:一直以来对SVD分解似懂非懂,此文为译文,原文以细致的分析+大量的可视化图形演示了SVD的几何意义.能在有限的篇幅把这个问题讲解的如此清晰,实属不易.原文举了一个简单的图像处理 ...

  3. java中的接口回调

    [接口回调]接口回调是多态的另一种体现.接口回调是指:可以把使用某一个接口的类创建的对象的引用赋给该接口声明的接口变量中,那么该接口变量就可以调用被类实现的接口中的方法.当接口变量调用被类实现的接口中 ...

  4. Using SSL Certificates with HAProxy--reference

    原文地址:http://serversforhackers.com/editions/2014/07/29/haproxy-ssl-termation-pass-through/ Overview I ...

  5. RedHat虚拟机&colon;Vmware Tools的安装

    如果我们仔细看的话,                  就会发现在VMware软件界面的左下角处显示着                  “you don't have VMware Tools in ...

  6. MQ产品比较-ActiveMQ-RocketMQ

    几种MQ产品说明: ZeroMQ :  扩展性好,开发比较灵活,采用C语言实现,实际上他只是一个socket库的重新封装,如果我们做为消息队列使用,需要开发大量的代码 RabbitMQ :结合erla ...

  7. Http异步发送之HttpWebRequest的BeginGetResponse

    关于http异步发送,一开始我的做法都是用thread或者task去完成的:后来发现HttpWebRequest本身就提供一个异步的方法. 总感觉.Net自己提供的异步方法可能要优于我们自己用线程去实 ...

  8. GitHub上传文件或项目的教程

    既然是往GitHub上传文件,那GitHub账号必须得有,这时候就会有同学问:妖怪吧,我没有GitHub账号怎么办? 别急别急,打开GitHub网站https://github.com/,然后注册就O ...

  9. CIO必看:跨国集团采购部报表系统的建设经验分享

    CIO必看:跨国集团采购部报表系统的建设经验分享 引言 福耀集团是国内最具规模.技术水平最高.出口量最大的汽车玻璃生产供应商,产品"FY"商标是中国汽车玻璃行业第一个"中 ...

  10. XVII Open Cup named after E&period;V&period; Pankratiev&period; GP of Two Capitals

    A. Artifact Guarding 选出的守卫需要满足$\max(a+b)\leq \sum a$,从小到大枚举每个值作为$\max(a+b)$,在权值线段树上找到最大的若干个$a$即可. 时间 ...