2、Netty java序列化服务端开发
Netty服务端接收到客户端的用户订购请求消息,消息定义如下:
服务端接收到消息后,合理化验证后,则订单成功的应答消息返回给客户端,应答消息如下
public class SubscribeReq implements Serializable {
/**
* 默认的序列号ID
* 订单消息
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
public final int getSubReqID() {
return subReqID;
}
public final void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public final String getUserName() {
return userName;
}
public final void setUserName(String userName) {
this.userName = userName;
}
public final String getProductName() {
return productName;
}
public final void setProductName(String productName) {
this.productName = productName;
}
public final String getPhoneNumber() {
return phoneNumber;
}
public final void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public final String getAddress() {
return address;
}
public final void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "SubscribeReq [subReqID=" + subReqID + ", userName=" + userName
+ ", productName=" + productName + ", phoneNumber="
+ phoneNumber + ", address=" + address + "]";
}
}
//回应消息
public class SubscribeResp implements Serializable {
/**
* 默认序列ID
* 消息应答
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private int respCode;
private String desc;
public final int getSubReqID() {
return subReqID;
}
public final void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public final int getRespCode() {
return respCode;
}
public final void setRespCode(int respCode) {
this.respCode = respCode;
}
public final String getDesc() {
return desc;
}
public final void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "SubscribeResp [subReqID=" + subReqID + ", respCode=" + respCode
+ ", desc=" + desc + "]";
}
}
//服务器启动类
public class SubReqServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//增加解码器
ch.pipeline().addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
//增加编码器
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqServer().bind(port);
}
}
//服务器处理类
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
SubscribeReq req = (SubscribeReq) msg;
if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscrib req : ["
+ req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeResp resp(int subReqID) {
SubscribeResp resp = new SubscribeResp();
resp.setSubReqID(subReqID);
resp.setRespCode(0);
resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return resp;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();// 发生异常,关闭链路
}
}
3、
Netty java序列化客户端开发
//客户端启动类
public class SubReqClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ObjectDecoder(1024, ClassResolvers
.cacheDisabled(this.getClass()
.getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new SubReqClient().connect(port, "127.0.0.1");
}
}
//客户端handler处理类
public class SubReqClientHandler extends ChannelHandlerAdapter {
/**
* Creates a client-side handler.
*/
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReq subReq(int i) {
SubscribeReq req = new SubscribeReq();
req.setAddress("北京市丰台区丰台科技园");
req.setPhoneNumber("138xxxxxxxxx");
req.setProductName("Netty 最佳实践和原理分析");
req.setSubReqID(i);
req.setUserName("Lilinfeng");
return req;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Netty java序列化的有点:用户只要通过短短的几行代码就能够完成entity的序列化和反序列化,在业务处理handler中,用户只需要将精力聚焦在业务逻辑的实现上,不需要关心底层的编解码细节,提高了开发效率。
总结:利用了Netty提供的ObjectEncoder编码器和ObjectDecoder编码器实现对普通entity对象的序列化,通过对图书的订购实例,并且模拟了TCP粘包和拆包场景。