前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。
不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。
也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。
于是再一次为了性能,我增加了同时投递多个的支持。
另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。
先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:
*
*/
package sun.nio.ch;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.AcceptPendingException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import sun.misc.Unsafe;
/**
* This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
* @author Yvon
*
*/
public class WindowsMultiAcceptSupport {
WindowsAsynchronousServerSocketChannelImpl schannel;
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 2 * (sizeof(SOCKET_ADDRESS) + 16)
private static final int ONE_DATA_BUFFER_SIZE = 88;
private long handle;
private Iocp iocp;
// typically there will be zero, or one I/O operations pending. In rare
// cases there may be more. These rare cases arise when a sequence of accept
// operations complete immediately and handled by the initiating thread.
// The corresponding OVERLAPPED cannot be reused/released until the completion
// event has been posted.
private PendingIoCache ioCache;
private Queue<Long> dataBuffers;
// the data buffer to receive the local/remote socket address
// private final long dataBuffer;
private AtomicInteger pendingAccept;
private int maxPending;
Method updateAcceptContextM;
Method acceptM;
WindowsMultiAcceptSupport() {
//dummy for JNI code.
}
public void close() throws IOException {
schannel.close();
for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
{
long addr = dataBuffers.poll();
// release resources
unsafe.freeMemory(addr);
}
}
/**
*
*/
public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
if (maxPost <= 0 || maxPost > 1024)
throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
maxPending = maxPost;
dataBuffers = new ConcurrentLinkedQueue<Long>();
for (int i = 0; i < maxPending + 1; i++) {
dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
}
pendingAccept = new AtomicInteger(0);
try {
Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
f.setAccessible(true);
handle = f.getLong(schannel);
f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
f.setAccessible(true);
iocp = (Iocp) f.get(schannel);
f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
f.setAccessible(true);
ioCache = (PendingIoCache) f.get(schannel);
f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
f.setAccessible(true);
AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);
accepting.set(true);//disable accepting by origin channel.
} catch (Exception e) {
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
public final <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel, ? super A> handler) {
if (handler == null)
throw new NullPointerException("'handler' is null");
implAccept(attachment, (CompletionHandler<AsynchronousSocketChannel, Object>) handler);
}
/**
* Task to initiate accept operation and to handle result.
*/
private class AcceptTask implements Runnable, Iocp.ResultHandler {
private final WindowsAsynchronousSocketChannelImpl channel;
private final AccessControlContext acc;
private final PendingFuture<AsynchronousSocketChannel, Object> result;
private final long dataBuffer;
AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
this.channel = channel;
this.acc = acc;
this.result = result;
this.dataBuffer = dataBuffer;
}
void enableAccept() {
pendingAccept.decrementAndGet();
dataBuffers.add(dataBuffer);
}
void closeChildChannel() {
try {
channel.close();
} catch (IOException ignore) {
}
}
// caller must have acquired read lock for the listener and child channel.
void finishAccept() throws IOException {
/**
* JDK7 use 4 calls to getsockname to setup
* local& remote address, this is very inefficient.
*
* I change this to use GetAcceptExSockaddrs
*/
InetAddress[] socks = new InetAddress[2];
int[] ports = new int[2];
updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);
final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
channel.setConnected(local, remote);
// permission check (in context of initiating thread)
if (acc != null) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
SecurityManager sm = System.getSecurityManager();
sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());
return null;
}
}, acc);
}
}
/**
* Initiates the accept operation.
*/
@Override
public void run() {
long overlapped = 0L;
try {
// begin usage of listener socket
schannel.begin();
try {
// begin usage of child socket (as it is registered with
// completion port and so may be closed in the event that
// the group is forcefully closed).
channel.begin();
synchronized (result) {
overlapped = ioCache.add(result);
int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
if (n == IOStatus.UNAVAILABLE) {
return;
}
// connection accepted immediately
finishAccept();
// allow another accept before the result is set
enableAccept();
result.setResult(channel);
}
} finally {
// end usage on child socket
channel.end();
}
} catch (Throwable x) {
// failed to initiate accept so release resources
if (overlapped != 0L)
ioCache.remove(overlapped);
closeChildChannel();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
enableAccept();
result.setFailure(x);
} finally {
// end of usage of listener socket
schannel.end();
}
// accept completed immediately but may not have executed on
// initiating thread in which case the operation may have been
// cancelled.
if (result.isCancelled()) {
closeChildChannel();
}
// invoke completion handler
Invoker.invokeIndirectly(result);
}
/**
* Executed when the I/O has completed
*/
@Override
public void completed(int bytesTransferred, boolean canInvokeDirect) {
try {
// connection accept after group has shutdown
if (iocp.isShutdown()) {
throw new IOException(new ShutdownChannelGroupException());
}
// finish the accept
try {
schannel.begin();
try {
channel.begin();
finishAccept();
} finally {
channel.end();
}
} finally {
schannel.end();
}
// allow another accept before the result is set
enableAccept();
result.setResult(channel);
} catch (Throwable x) {
enableAccept();
closeChildChannel();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
result.setFailure(x);
}
// if an async cancel has already cancelled the operation then
// close the new channel so as to free resources
if (result.isCancelled()) {
closeChildChannel();
}
// invoke handler (but not directly)
Invoker.invokeIndirectly(result);
}
@Override
public void failed(int error, IOException x) {
enableAccept();
closeChildChannel();
// release waiters
if (schannel.isOpen()) {
result.setFailure(x);
} else {
result.setFailure(new AsynchronousCloseException());
}
Invoker.invokeIndirectly(result);
}
}
Future<AsynchronousSocketChannel> implAccept(Object attachment,
final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
if (!schannel.isOpen()) {
Throwable exc = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(exc);
Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);
return null;
}
if (schannel.isAcceptKilled())
throw new RuntimeException("Accept not allowed due to cancellation");
// ensure channel is bound to local address
if (schannel.localAddress == null)
throw new NotYetBoundException();
// create the socket that will be accepted. The creation of the socket
// is enclosed by a begin/end for the listener socket to ensure that
// we check that the listener is open and also to prevent the I/O
// port from being closed as the new socket is registered.
WindowsAsynchronousSocketChannelImpl ch = null;
IOException ioe = null;
try {
schannel.begin();
ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
} catch (IOException x) {
ioe = x;
} finally {
schannel.end();
}
if (ioe != null) {
if (handler == null)
return CompletedFuture.withFailure(ioe);
Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);
return null;
}
// need calling context when there is security manager as
// permission check may be done in a different thread without
// any application call frames on the stack
AccessControlContext acc =
(System.getSecurityManager() == null) ? null : AccessController.getContext();
PendingFuture<AsynchronousSocketChannel, Object> result =
new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);
// check and set flag to prevent concurrent accepting
if (pendingAccept.get() >= maxPending)
throw new AcceptPendingException();
pendingAccept.incrementAndGet();
AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);
result.setContext(task);
// initiate I/O
if (Iocp.supportsThreadAgnosticIo()) {
task.run();
} else {
Invoker.invokeOnThreadInThreadPool(this.schannel, task);
}
return result;
}
// //reimplements for performance
static native void updateAcceptContext(long listenSocket, long acceptSocket,
InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;
static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);
}
对应的CPP代码如下:
/*
* Class: sun_nio_ch_WindowsMultiAcceptSupport
* Method: updateAcceptContext
* Signature: (JJ[Ljava/net/InetAddress;[IJ)V
*/
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
{
SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
INT iLocalAddrLen=0;
INT iRemoteAddrLen=0;
SOCKETADDRESS* lpLocalAddr;
SOCKETADDRESS* lpRemoteAddr;
jobject localAddr;
jobject remoteAddr;
jint ports[2]={0};
setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));
(lpGetAcceptExSockaddrs)(outputBuffer,
0,
sizeof(SOCKETADDRESS)+16,
sizeof(SOCKETADDRESS)+16,
(LPSOCKADDR*)&lpLocalAddr,
&iLocalAddrLen,
(LPSOCKADDR*)&lpRemoteAddr,
&iRemoteAddrLen);
localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));
env->SetObjectArrayElement(sockArray,0,localAddr);
env->SetObjectArrayElement(sockArray,1,remoteAddr);
env->SetIntArrayRegion(portArray,0,2,ports);
}
/*
* Class: sun_nio_ch_WindowsMultiAcceptSupport
* Method: accept0
* Signature: (JJJJ)I
*/
jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
(JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
{
BOOL res;
SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
DWORD nread = 0;
OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
//why use SOCKETADDRESS?
//because client may use IPv6 to connect to server.
res = (lpAcceptEx)(s1,
s2,
outputBuffer,
0,
sizeof(SOCKETADDRESS)+16,
sizeof(SOCKETADDRESS)+16,
&nread,
lpOverlapped);
if (res == 0) {
int error = WSAGetLastError();
if (error == ERROR_IO_PENDING) {
return NIO2_IOS_UNAVAILABLE;
}
return NIO2_THROWN;
}
return 0;
}
这里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,从DLL里加载。相应代码如下:
* Class: com_yovn_jabhttpd_utilities_SunPackageFixer
* Method: initFds
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
(JNIEnv *env, jclass clazz)
{
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidTransmitFile = WSAID_TRANSMITFILE;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
SOCKET s;
int rv;
DWORD dwBytes;
HMODULE hModule;
s = socket(AF_INET, SOCK_STREAM, 0);
if (s == INVALID_SOCKET) {
JNU_ThrowByName(env,"java/io/IOException", "socket failed");
return;
}
rv = WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpAcceptEx,
sizeof(lpAcceptEx),
&dwBytes,
NULL,
NULL);
if (rv != 0)
{
JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get AcceptEx ");
goto _ret;
}
rv = WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidTransmitFile,
sizeof(GuidTransmitFile),
&lpTransmitFile,
sizeof(lpTransmitFile),
&dwBytes,
NULL,
NULL);
if (rv != 0)
{
JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get TransmitFile");
goto _ret;
}
rv = WSAIoctl(s,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(LPVOID)&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&lpGetAcceptExSockaddrs,
sizeof(lpGetAcceptExSockaddrs),
&dwBytes,
NULL,
NULL);
if (rv != 0)
{
JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
goto _ret;
}
hModule=LoadLibrary("net.dll");
if(hModule==NULL)
{
JNU_ThrowByName(env, "java/io/IOException","can't load java net.dll");
goto _ret;
}
lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");
if(lpNET_SockaddrToInetAddress==NULL)
{
JNU_ThrowByName(env, "java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");
}
_ret:
closesocket(s);
return;
}
细心的同学可能会发现,在创建socket之前没有初始化WinSock库,因为在这段代码前,我初始化了一个InetSocketAddress对象,这样JVM会加载NET.DLL并初始化WinSock库了。
OK,现在,你可以在支持类上同时发起多个AcceptEx请求了。
PS:基于这个我简单测试了下我的服务器,同时开5000个线程,每个下载3M多点的文件,一分钟内能够全部正确完成。
服务器正在开发中,有兴趣的请加入:http://code.google.com/p/jabhttpd