需求
基础
netty 中接收一个数据处理流程 inboundHandler1->inboundHandler2->inboundHandler3
netty 发送一个数据的处理流程outboundHandler3->outboundHandler2->outboundHandler1
我们使用 netty 开发的时候很多初始化的代码都是重复的,一般都是 handler(数据的处理逻辑) 会根据业务的不同进行变化。
我们现在实现 spring 配置文件中动态的配置 handler。
在 spring 的配置文件 中可以这样动态的配置 handler 。
<constructor-arg name="adapters">
<list>
<value>inbound1</value>
<value>inbound2</value>
<value>serverHandler</value>
<value>outbound1</value>
<value>outbound2</value>
</list>
</constructor-arg>
实现
首先我们先实现一个 netty tcp server 入口类 NettyTcpServer.
package netty;
public interface IServer {
/**
* 启动服务器
*/
void start();
/**
* 停止服务器
*/
void stop();
}
/**
* 创建日期: 2017/10/18
* 创建作者:helloworldyu
* 文件名称:
* 功能:
*/
package netty;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .Slf4JLoggerFactory;
import org.;
import org.;
/**
* 功能:
*
* 创建作者:helloworldyu
* 文件名称:
* 创建日期: 2017/10/18
*/
public class NettyTcpServer implements IServer{
private static final Logger logger = ();
/**
* 初始化 netty 的日志系统
*/
static {
();
}
private int port = 8080;
private ChannelInitializer<SocketChannel> channelInitializer;
/**
* 接收请求的 nio 池
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
/**
* 接收数据的 nio 池
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public ChannelInitializer<SocketChannel> getChannelInitializer() {
return channelInitializer;
}
public void setChannelInitializer(ChannelInitializer<SocketChannel> channelInitializer) {
this.channelInitializer = channelInitializer;
}
@Override
public void start() {
ServerBootstrap b = new ServerBootstrap();
//指定接收链接的 NioEventLoop,和接收数据的 NioEventLoop
(bossGroup, workerGroup);
//指定server使用的 channel
();
//初始化处理请求的编解码,处理响应类等
(channelInitializer);
(ChannelOption.SO_BACKLOG,1024);
(ChannelOption.SO_REUSEADDR,true);
try {
// 服务器绑定端口监听
(port).sync();
("启动服务器成功,port={}",port);
}catch (InterruptedException e){
//错误日志
("启动服务器报错:",e);
}
}
@Override
public void stop(){
//异步关闭 EventLoop
Future<?> future = ();
Future<?> future1 = ();
//等待关闭成功
();
();
("退出服务器成功");
}
}
这个类有两个重要的参数 port 和 channelInitializer。其中 port 为监听的端口号。channelInitializer 为服务初始化相关的,这个类也是我们的重点。
初始化类 NettyTcpServerInitializer
NettyTcpServerInitializer 继承了ChannelInitializer,同时实现了 spring 的 ApplicationContextAware 接口。在 bean 初始化的时候获取所有的 spring 的上下文信息(这里为了获取其他的 bean)。
核心部分向 pipeline 中添加 handler 的时候是通过传进来的 bean 名字从 spring 中获取的。这也是为什么我们实现了 spring 的ApplicationContextAware。另外由于我经常会用到 IdleStateHandler 这个超时处理 handler 所以在这里专门单独处理了。实际上可以和普通的 handler 来实现。
("logging",new LoggingHandler(this.logLevel));
//添加超时处理器
if( null != this.idleStateHandler ){
IdleStateHandler handler = (IdleStateHandler)getBean(this.idleStateHandler);
(handler);
}
//添加处理器
this.().forEach(c->{
//通过初始化传进来的 bean 名字来初始化获取 bean
ChannelHandlerAdapter handler = (ChannelHandlerAdapter)getBean(c);
(handler);
});
完整代码:
/**
* 创建日期: 2017/10/18
* 创建作者:helloworldyu
* 文件名称:
* 功能:
*/
package netty;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import org.;
import org.;
import ;
import ;
import ;
import ;
/**
* 功能: nettyserver 初始化相关的参数
*
* * @author helloworldyu
* 文件名称:
* 创建日期: 2017/10/18
*/
public class NettyTcpServerInitializer extends ChannelInitializer<SocketChannel> implements ApplicationContextAware{
private static final Logger logger = (NettyTcpServerInitializer.class);
/**
* spring 的 bean 信息
*/
private static ApplicationContext applicationContext;
/**
* 重写以获取 spring 的 bean 信息
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// The spring application context.
= applicationContext;
}
/**
* 根据beanName获取bean
*/
public static Object getBean(String beanName)
{
if (null == beanName)
{
return null;
}
return (beanName);
}
/**
* netty 日志等级
*/
private LogLevel logLevel;
/**
* 所有handler处理适配器
*/
private List<String> adapters;
/**
* 空闲超时处理器
*/
private String idleStateHandler;
/**
* 没有超时处理的版本
* @param logLevel
* @param adapters
*/
public NettyTcpServerInitializer(LogLevel logLevel, List<String> adapters){
this.logLevel = logLevel;
this.adapters = adapters;
}
/**
* 有超时处理器的版本
* @param logLevel 日志等级
* @param idleStateHandler 超时处理器
* @param adapters 入站处理
*/
public NettyTcpServerInitializer(LogLevel logLevel,
String idleStateHandler,
List<String> adapters) {
this.logLevel = logLevel;
this.adapters = adapters;
this.idleStateHandler = idleStateHandler;
}
@Override
protected void initChannel(SocketChannel ch){
ChannelPipeline pipeline = ();
//设置日志
("logging",new LoggingHandler(this.logLevel));
//添加超时处理器
if( null != this.idleStateHandler ){
IdleStateHandler handler = (IdleStateHandler)getBean(this.idleStateHandler);
(handler);
}
//添加处理器
this.().forEach(c->{
//通过初始化传进来的 bean 名字来初始化获取 bean
ChannelHandlerAdapter handler = (ChannelHandlerAdapter)getBean(c);
(handler);
});
("新客户端链接:{}",this.toString());
}
@Override
public String toString() {
return "\n========================================================================\n"+
"NettyTcpServerInitializer\n" +
"logLevel=" + logLevel + "\n"+
"adapters=" + adapters + "\n"+
"idleStateHandlers=" + idleStateHandler +"\n"+
"========================================================================\n"
;
}
}
下面是spring 中的配置文件
配置文件中的 inbound1,inbound2,serverHandler,outbound1,outbound2 是测试用的代码。后面会给全。
核心部分: 添加不通的 handler
<bean id="channelInitializer" class="" scope="prototype">
<!--netty 调试日志的输出等级-->
<constructor-arg name="logLevel" value="DEBUG"/>
<constructor-arg name="idleStateHandler" value="idleStateHandler"/>
<constructor-arg name="adapters">
<list>
<value>inbound1</value>
<value>inbound2</value>
<value>serverHandler</value>
<value>outbound1</value>
<value>outbound2</value>
</list>
</constructor-arg>
</bean>
完整的代码:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:xsi="http:///2001/XMLSchema-instance"
xmlns:context="/schema/context"
xsi:schemaLocation="/schema/beans
/schema/beans/spring-beans-4.
/schema/context
/schema/context/spring-context-4.">
<!-- 扫描bean -->
<context:component-scan base-package="">
</context:component-scan>
<!-- 加载项目配置文件 -->
<context:property-placeholder location=""/>
<!--=============初始化 tcpserver ==========-->
<!--指定处理的编解码器,响应处理器-->
<!--要注意是否可以重入,可以的话记得加 @Sharedable,否则scope 要是 prototype-->
<bean id="inbound1" class="" scope="prototype"/>
<bean id="inbound2" class="" scope="prototype"/>
<bean id="serverHandler" class="" scope="prototype"/>
<bean id="outbound1" class="" scope="prototype"/>
<bean id="outbound2" class="" scope="prototype"/>
<!--超时处理器-->
<bean id="idleStateHandler" class="" scope="prototype">
<constructor-arg name="readerIdleTime" value="0"/>
<constructor-arg name="writerIdleTime" value="0"/>
<constructor-arg name="allIdleTime" value="5"/>
<constructor-arg name="unit" value="SECONDS"/>
</bean>
<!--指定处理的编解码器,响应处理器-->
<!--要注意是否可以重入,可以的话记得加 @Sharedable,否则scope 要是 prototype-->
<bean id="channelInitializer" class="" scope="prototype">
<!--netty 调试日志的输出等级-->
<constructor-arg name="logLevel" value="DEBUG"/>
<constructor-arg name="idleStateHandler" value="idleStateHandler"/>
<constructor-arg name="adapters">
<list>
<value>inbound1</value>
<value>inbound2</value>
<value>serverHandler</value>
<value>outbound1</value>
<value>outbound2</value>
</list>
</constructor-arg>
</bean>
<bean id="tpcserver" class=""
scope="singleton" init-method="start" destroy-method="stop">
<!--初始化类,和端口号-->
<property name="port" value="8899"/>
<property name="channelInitializer" ref="channelInitializer"/>
</bean>
</beans>
完整代码
完整源代码:spring-netty 模块
/yuhaiyang457288/netty-test
仔细看完有疑惑的,或者有建议的可以加 物联网交流群: 651219170