问题:当同一个机器模拟终端数量达到3000以上时,服务端只能接收到2000多个数据包
备注:2000一下数据包解码 解析没有问题
模拟客户端与服务端netty配置如下
package org.dht.vehicle.com.socketfactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class BasicSocketServer implements SocketServer {
protected ChannelHandler serverChannel;
protected Channel acceptorChannel;
protected ServerBootstrap b ;
protected EventLoopGroup bossGroup ;
protected EventLoopGroup workerGroup ;
protected List<Integer> port;
protected List<ChannelFuture> channelFuture;
public BasicSocketServer(){
this.channelFuture = new ArrayList<ChannelFuture>();
}
public void setServerChannel(ChannelHandler serverChannel){
this.serverChannel = serverChannel;
}
public ChannelHandler getServerChannel(){
return this.serverChannel ;
}
public void setPort(List<Integer> port){
this.port = port;
}
public void Start() throws Exception {
// TODO Auto-generated method stub
try{
createServerBootstrap();
}finally{
Stop();
}
}
public void Stop() throws Exception {
// TODO Auto-generated method stub
closeFuture();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public void Restart() throws Exception {
// TODO Auto-generated method stub
Stop();
Start();
}
public void createServerBootstrap() throws Exception{
// TODO Auto-generated method stub
try{
b = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_RCVBUF, 10*1024*1024)
.option(ChannelOption.SO_SNDBUF, 1024*1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(serverChannel);
bindPort();
// Wait until the server socket is closed.
closeFuture();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void bindPort() throws InterruptedException {
// TODO Auto-generated method stub
Iterator<Integer> iter = port.iterator();
int nPort;
while(iter.hasNext())
{
nPort = (Integer)iter.next().intValue();
if(nPort>0){
ChannelFuture f = b.bind(nPort).sync();
channelFuture.add(f);
}
//port.remove(iter.next());
}
}
public void closeFuture() throws InterruptedException {
// TODO Auto-generated method stub
Iterator<ChannelFuture> iter = channelFuture.iterator();
ChannelFuture f = null;
while(iter.hasNext())
{
f=(ChannelFuture)iter.next();
if(null != f){
f.channel().closeFuture().sync();
}
//port.remove(iter.next());
}
}
}
package org.dht.vehicle.data.com;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.dht.vehicle.com.deCoder.BJVehicleDeviceDataDecoder;
import org.dht.vehicle.com.message.MessageManager;
import org.dht.vehicle.com.message.MessageSendVehicleRegister;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class BJTCPComService {
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
private static ChannelFuture f = null;;
private static EventLoopGroup group = new NioEventLoopGroup();
private static Bootstrap b = new Bootstrap();
private static Map<String, DeviceConInfo> map = new ConcurrentHashMap<String, DeviceConInfo>();
public static void start() {
// TODO Auto-generated method stub
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_RCVBUF, 1024*1024)
.option(ChannelOption.SO_SNDBUF, 10*1024*1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new BJVehicleDeviceDataDecoder());
p.addLast(new DeviceClientHandler());
}
});
}
public static Map<String, DeviceConInfo> getMap() {
return map;
}
public static Channel getChannel() {
if (null != f)
return f.channel();
return null;
}
public static void connect(String ip, String port)
throws NumberFormatException, InterruptedException {
ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
DeviceConInfo d = new DeviceConInfo();
d.socketChannel = (SocketChannel) f.channel();
map.put(String.valueOf(1), d);
}
public static void connect(int num, int oneNums, String ip, String port,
int beginID) {
for (int i = 0; i < num; i++) {
try {
ChannelFuture f = b.connect(ip, Integer.parseInt(port)).sync();
System.out.println("====" + MessageManager.getMessageManger()
+ "====" + f.channel());
DeviceConInfo d = new DeviceConInfo();
String strID = String.format("%07d", i + 1 + beginID);
String identiCode = "abcdefghij" + strID;
d.socketChannel = (SocketChannel) f.channel();
d.identiCode = identiCode;
d.onState = BJProtocolConst.CONNECTED;
map.put(identiCode, d);
MessageSendVehicleRegister messagePack = new MessageSendVehicleRegister(
null, 0, d);
MessageManager.getMessageManger().addSocketMessage(messagePack);
Thread.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void connectNums() {
System.out.println("======client nums:" + map.size() + "=====");
}
public static int getOnLineDevices() {
int nums = 0;
for (Map.Entry entry : map.entrySet()) {
DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
if (BJProtocolConst.LOGINED == devConInfo.onState) {
nums++;
}
}
return nums;
}
public static void diConnect() {
for (Map.Entry entry : map.entrySet()) {
DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
if (null != devConInfo.socketChannel)
devConInfo.socketChannel.close();
try {
devConInfo.socketChannel.closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
map.remove(entry.getKey());
}
}
public static void stop() {
diConnect();
group.shutdownGracefully();
}
public static DeviceConInfo update(String identiCode,
DeviceConInfo deviceConInfo) {
return map.put(identiCode, deviceConInfo);
}
public static DeviceConInfo get(String identiCode) {
return map.get(identiCode);
}
public static void remove(SocketChannel socketChannel) {
for (Map.Entry entry : map.entrySet()) {
DeviceConInfo devConInfo = (DeviceConInfo) entry.getValue();
if (devConInfo.socketChannel == socketChannel) {
map.remove(entry.getKey());
}
}
}
}
3 个解决方案
#1
这不是全部代码把,莫非你让别人肉眼帮你看代码码?
#2
强大的底层NIO怎么可能,楼主详细说说,机器配置,测试网路
#3
楼主这个问题现在解决了吗 我也遇到了同样的问题
#1
这不是全部代码把,莫非你让别人肉眼帮你看代码码?
#2
强大的底层NIO怎么可能,楼主详细说说,机器配置,测试网路
#3
楼主这个问题现在解决了吗 我也遇到了同样的问题