netty4服务端接收数据丢包

时间:2022-08-29 00:15:18
目标:实现服务端100000车辆数据接收 解析
问题:当同一个机器模拟终端数量达到3000以上时,服务端只能接收到2000多个数据包
备注:2000一下数据包解码 解析没有问题
模拟客户端与服务端netty配置如下
package org.dht.vehicle.com.socketfactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
 
 
 
public class BasicSocketServer implements SocketServer {
 
    protected ChannelHandler serverChannel; 
    protected Channel acceptorChannel;
    protected ServerBootstrap b ;   
    protected EventLoopGroup bossGroup ;
    protected EventLoopGroup workerGroup ;
    protected List<Integer> port;
    protected List<ChannelFuture> channelFuture;  
     
    public BasicSocketServer(){
        this.channelFuture = new ArrayList<ChannelFuture>();
    }
    public void setServerChannel(ChannelHandler serverChannel){
        this.serverChannel = serverChannel;
    }
    public ChannelHandler getServerChannel(){
        return this.serverChannel ;
    }
    public void setPort(List<Integer> port){
        this.port = port;
    }
     
    public void Start() throws Exception {
        // TODO Auto-generated method stub
        try{
             
            createServerBootstrap();
             
        }finally{
            Stop();
        }
    }
 
    public void Stop() throws Exception {
        // TODO Auto-generated method stub
        closeFuture();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
 
    public void Restart() throws Exception {
        // TODO Auto-generated method stub
        Stop();
        Start();
    }
 
    public void createServerBootstrap() throws Exception{
             
        // TODO Auto-generated method stub
    try{
         b           = new ServerBootstrap();
             
         bossGroup   = new NioEventLoopGroup(1);
         workerGroup = new NioEventLoopGroup(); 
                          
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_RCVBUF, 10*1024*1024)
         .option(ChannelOption.SO_SNDBUF, 1024*1024)
         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)   
         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(serverChannel);
        bindPort();      
        // Wait until the server socket is closed.
        closeFuture();
          
     } finally {
         // Shut down all event loops to terminate all threads.
         bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
     }
    }
 
 
    public void bindPort() throws InterruptedException {
        // TODO Auto-generated method stub      
          
         
        Iterator<Integer> iter = port.iterator();           
        int nPort;            
            while(iter.hasNext())  
            {  
                nPort = (Integer)iter.next().intValue();  
                if(nPort>0){
                ChannelFuture f = b.bind(nPort).sync();                 
                channelFuture.add(f);
                }
                //port.remove(iter.next());
            }               
    }
 
 
     
    public void closeFuture() throws InterruptedException {
        // TODO Auto-generated method stub
         
          Iterator<ChannelFuture> iter = channelFuture.iterator();        
          ChannelFuture f = null;
           
            while(iter.hasNext())  
            {  
             
                f=(ChannelFuture)iter.next();
                if(null != f){
                    f.channel().closeFuture().sync();   
                 }
                //port.remove(iter.next());
            }                   
    }
 
}

package org.dht.vehicle.data.com;
 
 
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
 
import org.dht.vehicle.com.deCoder.BJVehicleDeviceDataDecoder;
import org.dht.vehicle.com.message.MessageManager;
import org.dht.vehicle.com.message.MessageSendVehicleRegister;
 
 
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
 
public class BJTCPComService {
 static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
 private static ChannelFuture f = null;;
 private static EventLoopGroup group = new NioEventLoopGroup();
 private static Bootstrap b = new Bootstrap();
 private static Map<String, DeviceConInfo> map = new ConcurrentHashMap<String, DeviceConInfo>();
 
 
 public static void start() {
 // TODO Auto-generated method stub
 
 
 b.group(group)
 .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_RCVBUF, 1024*1024)
                .option(ChannelOption.SO_SNDBUF, 10*1024*1024)      
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch)
 throws Exception {
 ChannelPipeline p = ch.pipeline();
 p.addLast(new BJVehicleDeviceDataDecoder());
 p.addLast(new DeviceClientHandler());
 }
 });
 
 
 }
 
 
 public static Map<String, DeviceConInfo> getMap() {
 return map;
 }
 
 
 public static Channel getChannel() {
 if (null != f)
 return f.channel();
 return null;
 }
 
 
 public static void connect(String ip, String port)
 throws NumberFormatException, InterruptedException {
 
 
 ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
 DeviceConInfo d = new DeviceConInfo();
 d.socketChannel = (SocketChannel) f.channel();
 map.put(String.valueOf(1), d);
 }
 
 
 public static void connect(int num, int oneNums, String ip, String port,
 int beginID) {
 for (int i = 0; i < num; i++) {
 
 
 try {
 ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
 
 
 System.out.println("====" + MessageManager.getMessageManger()
 + "====" + f.channel());
 DeviceConInfo d = new DeviceConInfo();
 String strID = String.format("%07d", i + 1 + beginID);
 String identiCode = "abcdefghij" + strID;
 d.socketChannel = (SocketChannel) f.channel();
 d.identiCode = identiCode;
 d.onState = BJProtocolConst.CONNECTED;
 map.put(identiCode, d);
 MessageSendVehicleRegister messagePack = new MessageSendVehicleRegister(
 null, 0, d);
 
 
 MessageManager.getMessageManger().addSocketMessage(messagePack);
 
 
 Thread.sleep(5);
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }
 }
 
 
 public static void connectNums() {
 System.out.println("======client nums:" + map.size() + "=====");
 }
 
 
 public static int getOnLineDevices() {
 int nums = 0;
 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
 
 
 if (BJProtocolConst.LOGINED == devConInfo.onState) {
 nums++;
 }
 
 
 }
 return nums;
 
 
 }
 
 
 public static void diConnect() {
 
 
 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
 
 
 if (null != devConInfo.socketChannel)
 devConInfo.socketChannel.close();
 try {
 devConInfo.socketChannel.closeFuture().sync();
 } catch (InterruptedException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 map.remove(entry.getKey());
 }
 
 
 }
 
 
 public static void stop() {
 diConnect();
 group.shutdownGracefully();
 
 
 }
 
 
 public static DeviceConInfo update(String identiCode,
 DeviceConInfo deviceConInfo) {
 
 
 return map.put(identiCode, deviceConInfo);
 
 
 }
 
 
 public static DeviceConInfo get(String identiCode) {
 
 
 return map.get(identiCode);
 
 
 }
 
 
 public static void remove(SocketChannel socketChannel) {
 
 
 for (Map.Entry entry : map.entrySet()) {
 DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
 if (devConInfo.socketChannel == socketChannel) {
 map.remove(entry.getKey());
 }
 }
 }
}
 

3 个解决方案

#1


这不是全部代码把,莫非你让别人肉眼帮你看代码码?

#2


强大的底层NIO怎么可能,楼主详细说说,机器配置,测试网路

#3


楼主这个问题现在解决了吗 我也遇到了同样的问题

#1


这不是全部代码把,莫非你让别人肉眼帮你看代码码?

#2


强大的底层NIO怎么可能,楼主详细说说,机器配置,测试网路

#3


楼主这个问题现在解决了吗 我也遇到了同样的问题