Netty创建通信服务时使用Nio异步通信, 配置代码(bootstrap.channel(NioSocketChannel.class);),要怎样实现这样一个同步发送消息并接收消息功能,虽然这样做觉得很没必要。
public class ChannelUtil {
public static Object writeMsgSync(Object msg, Channel channel, AttributeKey<Object> attrKey, int timeout) throws Exception{
if(channel.isActive()){
synchronized(channel){
Attribute<Object> attr = channel.attr(attrKey);
synchronized(attr){
attr.set(msg);
System.out.println("发消息:"+msg);
channel.writeAndFlush(msg);
attr.wait(timeout * 1000);
Object ret = attr.get();
System.out.println("收消息:"+ret);
attr.set(null);
if(ret == msg){
return null;
}
return ret;
}
}
}
return null;
}
}
下面的方法需要加入到收消息的Handler(如SimpleChannelInboundHandler)中
@Override
public void onRead(ChannelHandlerContext ctx, CmdMsg msg) throws Exception {
System.out.println("收到消息:"+msg);
Channel channel = ctx.channel();
Attribute<Object> attr = channel.attr(ClientGlobal.CHANNEL_SYNC_KEY);
synchronized(attr){
Object lastMsg = attr.get();
if (lastMsg != null && lastMsg instanceof CmdMsg) {
CmdMsg lastCmdMsg = (CmdMsg) lastMsg;
if (msg.getCmdCode() == lastCmdMsg.getCmdCode()) {
// 当前消息就是正在等待返回的响应消息,同步消息
attr.set(msg);
attr.notify();
System.out.println("此消息是同步响应消息:" + msg);
return;
}
else{
if(msg.getCmdCode() == CmdConst.CMD_RET){
RetMsg retMsg = (RetMsg)PackUtil.unpackCmdMsg(msg, RetMsg.class);
if(retMsg.getOpCode() == lastCmdMsg.getCmdCode()){
attr.set(msg);
attr.notify();
System.out.println("此消息是同步响应消息:" + msg);
return;
}
}
}
}
}
System.out.println("消息加入到队列中:"+msg);
MsgTask msgTask = new MsgTask();
msgTask.setCtx(ctx);
msgTask.setMsg(msg);
BaseProcessor<?> processor = msgModule.getMsgProcessor(msg.getCmdCode());
if(processor == null){
//没有注册对应的消息加工器
System.out.println("没有注册对应的消息加工器:msg code is "+msg.getCmdCode());
return;
}
msgTask.setProcessor(processor);
msgGueueMgr.pushMsgTask(msgTask);
}
实现原理:通过Channel和Channel中的Attribute,利用wait、Notify,实现线程间通信
1: 发送消息时,将channel中的属性设值,然后提交到Netty内部消息队列,等待一个超时时间,如果这段时间内,有人将其唤醒,则取出channel的属性值,这个就是返回的消息;
2:接收消息时,判断channel中的属性是否有设值,若有,判断当前消息是否是此属性中消息的响应消息,若是,则唤醒channel属性。