基于JDK7 NIO2的高性能web服务器实践之二(转)

时间:2022-09-05 14:08:05

前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。 

不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。
也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。

于是再一次为了性能,我增加了同时投递多个的支持。

另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。


先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:

/**
 * 
 */
package sun.nio.ch;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.AcceptPendingException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import sun.misc.Unsafe;

/**
 * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
 * @author Yvon
 *
 */
public class WindowsMultiAcceptSupport {

    WindowsAsynchronousServerSocketChannelImpl schannel;

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    // 2 * (sizeof(SOCKET_ADDRESS) + 16)
    private static final int ONE_DATA_BUFFER_SIZE = 88;

    private long handle;
    private Iocp iocp;

    // typically there will be zero, or one I/O operations pending. In rare
    // cases there may be more. These rare cases arise when a sequence of accept
    // operations complete immediately and handled by the initiating thread.
    // The corresponding OVERLAPPED cannot be reused/released until the completion
    // event has been posted.
    private PendingIoCache ioCache;

    private Queue<Long> dataBuffers;
    // the data buffer to receive the local/remote socket address
    //        private final long dataBuffer;

    private AtomicInteger pendingAccept;
    private int maxPending;

    Method updateAcceptContextM;
    Method acceptM;

    WindowsMultiAcceptSupport() {
        //dummy for JNI code.
    }

    public void close() throws IOException {

        schannel.close();

        for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
        {
            long addr = dataBuffers.poll();
            // release  resources
            unsafe.freeMemory(addr);
        }

    }

    /**
     * 
     */
    public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
        if (maxPost <= 0 || maxPost > 1024)
            throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
        this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
        maxPending = maxPost;
        dataBuffers = new ConcurrentLinkedQueue<Long>();
        for (int i = 0; i < maxPending + 1; i++) {
            dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
        }

        pendingAccept = new AtomicInteger(0);
        try {
            Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
            f.setAccessible(true);
            handle = f.getLong(schannel);


            f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
            f.setAccessible(true);
            iocp = (Iocp) f.get(schannel);

            f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
            f.setAccessible(true);
            ioCache = (PendingIoCache) f.get(schannel);

            f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
            f.setAccessible(true);
            AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);

            accepting.set(true);//disable accepting by origin channel.

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @SuppressWarnings("unchecked")
    public final <A> void accept(A attachment,
        CompletionHandler<AsynchronousSocketChannel, ? super A> handler) {
        if (handler == null)
            throw new NullPointerException("'handler' is null");
        implAccept(attachment, (CompletionHandler<AsynchronousSocketChannel, Object>) handler);
    }

    /**
     * Task to initiate accept operation and to handle result.
     */
    private class AcceptTask implements Runnable, Iocp.ResultHandler {

        private final WindowsAsynchronousSocketChannelImpl channel;
        private final AccessControlContext acc;
        private final PendingFuture<AsynchronousSocketChannel, Object> result;
        private final long dataBuffer;

        AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
            long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
            this.channel = channel;
            this.acc = acc;
            this.result = result;
            this.dataBuffer = dataBuffer;
        }

        void enableAccept() {
            pendingAccept.decrementAndGet();
            dataBuffers.add(dataBuffer);
        }

        void closeChildChannel() {
            try {
                channel.close();
            } catch (IOException ignore) {
            }
        }

        // caller must have acquired read lock for the listener and child channel.
        void finishAccept() throws IOException {
            /**
             * JDK7 use 4 calls to getsockname  to setup
             * local& remote address, this is very inefficient.
             * 
             * I change this to use GetAcceptExSockaddrs
             */

            InetAddress[] socks = new InetAddress[2];
            int[] ports = new int[2];
            updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
            InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);
            final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
            channel.setConnected(local, remote);

            // permission check (in context of initiating thread)
            if (acc != null) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {

                    public Void run() {
                        SecurityManager sm = System.getSecurityManager();
                        sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

                        return null;
                    }
                }, acc);
            }
        }

        /**
         * Initiates the accept operation.
         */
        @Override
        public void run() {
            long overlapped = 0L;

            try {
                // begin usage of listener socket
                schannel.begin();
                try {
                    // begin usage of child socket (as it is registered with
                    // completion port and so may be closed in the event that
                    // the group is forcefully closed).
                    channel.begin();

                    synchronized (result) {
                        overlapped = ioCache.add(result);

                      
                        int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
                        if (n == IOStatus.UNAVAILABLE) {
                            return;
                        }

                        // connection accepted immediately
                        finishAccept();

                        // allow another accept before the result is set
                        enableAccept();
                        result.setResult(channel);
                    }
                } finally {
                    // end usage on child socket
                    channel.end();
                }
            } catch (Throwable x) {
                // failed to initiate accept so release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                enableAccept();
                result.setFailure(x);
            } finally {
                // end of usage of listener socket
                schannel.end();
            }

            // accept completed immediately but may not have executed on
            // initiating thread in which case the operation may have been
            // cancelled.
            if (result.isCancelled()) {
                closeChildChannel();
            }

            // invoke completion handler
            Invoker.invokeIndirectly(result);
        }

        /**
         * Executed when the I/O has completed
         */
        @Override
        public void completed(int bytesTransferred, boolean canInvokeDirect) {
            try {
                // connection accept after group has shutdown
                if (iocp.isShutdown()) {
                    throw new IOException(new ShutdownChannelGroupException());
                }

                // finish the accept
                try {
                    schannel.begin();
                    try {
                        channel.begin();
                        finishAccept();
                    } finally {
                        channel.end();
                    }
                } finally {
                    schannel.end();
                }

                // allow another accept before the result is set
                enableAccept();
                result.setResult(channel);
            } catch (Throwable x) {
                enableAccept();
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                result.setFailure(x);
            }

            // if an async cancel has already cancelled the operation then
            // close the new channel so as to free resources
            if (result.isCancelled()) {
                closeChildChannel();
            }

            // invoke handler (but not directly)
            Invoker.invokeIndirectly(result);
        }

        @Override
        public void failed(int error, IOException x) {
            enableAccept();
            closeChildChannel();

            // release waiters
            if (schannel.isOpen()) {
                result.setFailure(x);
            } else {
                result.setFailure(new AsynchronousCloseException());
            }
            Invoker.invokeIndirectly(result);
        }
    }

    Future<AsynchronousSocketChannel> implAccept(Object attachment,
        final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
        if (!schannel.isOpen()) {
            Throwable exc = new ClosedChannelException();
            if (handler == null)
                return CompletedFuture.withFailure(exc);
            Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);
            return null;
        }
        if (schannel.isAcceptKilled())
            throw new RuntimeException("Accept not allowed due to cancellation");

        // ensure channel is bound to local address
        if (schannel.localAddress == null)
            throw new NotYetBoundException();

        // create the socket that will be accepted. The creation of the socket
        // is enclosed by a begin/end for the listener socket to ensure that
        // we check that the listener is open and also to prevent the I/O
        // port from being closed as the new socket is registered.
        WindowsAsynchronousSocketChannelImpl ch = null;
        IOException ioe = null;
        try {
            schannel.begin();
            ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
        } catch (IOException x) {
            ioe = x;
        } finally {
            schannel.end();
        }
        if (ioe != null) {
            if (handler == null)
                return CompletedFuture.withFailure(ioe);
            Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);
            return null;
        }

        // need calling context when there is security manager as
        // permission check may be done in a different thread without
        // any application call frames on the stack
        AccessControlContext acc =
            (System.getSecurityManager() == null) ? null : AccessController.getContext();

        PendingFuture<AsynchronousSocketChannel, Object> result =
            new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);

        // check and set flag to prevent concurrent accepting
        if (pendingAccept.get() >= maxPending)
            throw new AcceptPendingException();
        pendingAccept.incrementAndGet();
        AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);
        result.setContext(task);

        // initiate I/O
        if (Iocp.supportsThreadAgnosticIo()) {
            task.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(this.schannel, task);
        }
        return result;
    }

    //    //reimplements for performance
    static native void updateAcceptContext(long listenSocket, long acceptSocket,
        InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;

    static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

}



对应的CPP代码如下:


/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    updateAcceptContext
 * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
 */
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
{
    SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
    INT iLocalAddrLen=0;
    INT iRemoteAddrLen=0;
    SOCKETADDRESS* lpLocalAddr;
    SOCKETADDRESS* lpRemoteAddr;
    jobject localAddr;
    jobject remoteAddr;
    jint ports[2]={0};

    

    setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));

    (lpGetAcceptExSockaddrs)(outputBuffer,
        0,
        sizeof(SOCKETADDRESS)+16,
        sizeof(SOCKETADDRESS)+16,
        (LPSOCKADDR*)&lpLocalAddr,
        &iLocalAddrLen,
        (LPSOCKADDR*)&lpRemoteAddr,
        &iRemoteAddrLen);

    localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
    remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

    env->SetObjectArrayElement(sockArray,0,localAddr);
    env->SetObjectArrayElement(sockArray,1,remoteAddr);
    env->SetIntArrayRegion(portArray,0,2,ports);

}

/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    accept0
 * Signature: (JJJJ)I
 */
jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
  (JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
{

    BOOL res;
    SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);

    DWORD nread = 0;
    OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
    ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));

    

    //why use SOCKETADDRESS?
    //because client may use IPv6 to connect to server.
    res = (lpAcceptEx)(s1,
        s2,
        outputBuffer,
        0,
        sizeof(SOCKETADDRESS)+16,
        sizeof(SOCKETADDRESS)+16,
        &nread,
        lpOverlapped);

    
    if (res == 0) {
        int error = WSAGetLastError();
        
        if (error == ERROR_IO_PENDING) {
            
            return NIO2_IOS_UNAVAILABLE;
        }
    
    
        return NIO2_THROWN;
    }



    
    return 0;

}


这里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,从DLL里加载。相应代码如下:

*
 * Class:     com_yovn_jabhttpd_utilities_SunPackageFixer
 * Method:    initFds
 * Signature: ()V
 */
JNIEXPORT void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
  (JNIEnv *env, jclass clazz)
{


    GUID GuidAcceptEx = WSAID_ACCEPTEX;
    GUID GuidTransmitFile = WSAID_TRANSMITFILE;
    GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
    SOCKET s;
    int rv;
    DWORD dwBytes;
    HMODULE hModule;


    s = socket(AF_INET, SOCK_STREAM, 0);
    if (s == INVALID_SOCKET) {
        JNU_ThrowByName(env,"java/io/IOException", "socket failed");
        return;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidAcceptEx,
        sizeof(GuidAcceptEx),
        &lpAcceptEx,
        sizeof(lpAcceptEx),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get AcceptEx ");
        goto _ret;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidTransmitFile,
        sizeof(GuidTransmitFile),
        &lpTransmitFile,
        sizeof(lpTransmitFile),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get TransmitFile");
        goto _ret;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidGetAcceptExSockAddrs,
        sizeof(GuidGetAcceptExSockAddrs),
        &lpGetAcceptExSockaddrs,
        sizeof(lpGetAcceptExSockaddrs),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
        goto _ret;
    }

    hModule=LoadLibrary("net.dll");
    if(hModule==NULL)
    {
        JNU_ThrowByName(env, "java/io/IOException","can't load java net.dll");
        goto _ret;
    }


    lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");

    if(lpNET_SockaddrToInetAddress==NULL)
    {
        JNU_ThrowByName(env, "java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");
        
        
    }

_ret:
    closesocket(s);
    return;


}


细心的同学可能会发现,在创建socket之前没有初始化WinSock库,因为在这段代码前,我初始化了一个InetSocketAddress对象,这样JVM会加载NET.DLL并初始化WinSock库了。

OK,现在,你可以在支持类上同时发起多个AcceptEx请求了。

PS:基于这个我简单测试了下我的服务器,同时开5000个线程,每个下载3M多点的文件,一分钟内能够全部正确完成。
服务器正在开发中,有兴趣的请加入:http://code.google.com/p/jabhttpd