先转载一篇 【初学与研发之NETTY】netty3之文件上传 http://blog.csdn.net/mcpang/article/details/41139859
客户端:
[java] view plain copy package netty3.socket.client; import static org.jboss.netty.channel.Channels.pipeline; import java.io.File;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpClientCodec;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import org.jboss.netty.handler.codec.http.multipart.HttpDataFactory;
import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil; public class UploadFileClient
{
private ClientBootstrap bootstrap = null; private ChannelFuture future = null; private HttpDataFactory factory = null; // 服务端处理完成后返回的消息
private StringBuffer retMsg = new StringBuffer(); public UploadFileClient()
{
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new UploadChannelFactory()); // 连接超时时间为3s
bootstrap.setOption("connectTimeoutMillis", 3000); future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 2777)); // 获得一个阈值,它是来控制上传文件时内存/硬盘的比值,防止出现内存溢出
factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
} /**
* 方法描述:关闭文件发送通道(为阻塞式)
*/
public void shutdownClient()
{
// 等待数据的传输通道关闭
future.getChannel().getCloseFuture().awaitUninterruptibly(); bootstrap.releaseExternalResources(); // Really clean all temporary files if they still exist
factory.cleanAllHttpDatas();
} /**
* 方法描述:获取发送文件过程中服务端反馈的消息
* @return 服务端反馈的消息
*/
public String getRetMsg()
{
return retMsg.toString();
} /**
* 方法描述:将文件上传到服务端
* @param file 待上传的文件
*/
public void uploadFile(File file)
{
if (!file.canRead())
{
return;
} // Simple Post form: factory used for big attributes
List<InterfaceHttpData> bodylist = formpost(file);
if (bodylist == null)
{
return;
} // Multipart Post form: factory used
uploadFileToServer(file.getName(), factory, bodylist);
} /**
* @param file
* @return
*/
private List<InterfaceHttpData> formpost(File file)
{
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, ""); // Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder = null;
try
{
bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, false);
bodyRequestEncoder.addBodyAttribute("getform", "POST");
bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);
}
catch(Exception e)
{
// should not be since args are not null
e.printStackTrace();
return null;
} // Create the bodylist to be reused on the last version with Multipart support
List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes(); return bodylist;
} /**
* Multipart example
*/
private void uploadFileToServer(String fileName, HttpDataFactory factory, List<InterfaceHttpData> bodylist)
{
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess())
{
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
} // Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, fileName); // 设置该属性表示服务端文件接收完毕后会关闭发送通道
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); // Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder = null;
try
{
bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true);
bodyRequestEncoder.setBodyHttpDatas(bodylist);
bodyRequestEncoder.finalizeRequest();
}
catch(Exception e)
{
// should not be since no null args
e.printStackTrace();
}
System.out.println("开始时间:"+System.currentTimeMillis());
// send request
channel.write(request); // test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked())
{
channel.write(bodyRequestEncoder).awaitUninterruptibly();
} // Now no more use of file representation (and list of HttpData)
bodyRequestEncoder.cleanFiles();
} private class UploadChannelFactory implements ChannelPipelineFactory
{ public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new UploadClientHandler()); return pipeline;
}
} private class UploadClientHandler extends SimpleChannelUpstreamHandler
{
private boolean readingChunks; /**
* 方法描述:接收服务端返回的消息
* @param ctx 发送消息的通道对象
* @param e 消息发送事件对象
*/
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
if (!readingChunks)
{
HttpResponse response = (HttpResponse)e.getMessage(); // 收到服务端反馈的消息,并且链接正常、且还有后续消息
if (response.getStatus().getCode() == 200 && response.isChunked())
{
readingChunks = true;
}
else
{
// 服务端有反馈消息,但没有后续的消息了
ChannelBuffer content = response.getContent();
if (content.readable())
{
retMsg.append(content.toString(CharsetUtil.UTF_8));
}
}
}
else
{
HttpChunk chunk = (HttpChunk)e.getMessage();
if (chunk.isLast())
{
// 服务端的消息接收完毕
readingChunks = false;
}
else
{
// 连续接收服务端发过来的消息
retMsg.append(chunk.getContent().toString(CharsetUtil.UTF_8));
}
}
} /**
* 方法描述:消息接收或发送过程中出现异常
* @param ctx 发送消息的通道对象
* @param e 异常事件对象
*/
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
{
System.out.println("异常--:" + e.getCause());
e.getChannel().close(); // 有异常后释放客户端占用的通道资源
shutdownClient();
}
}
} 服务端:
[java] view plain copy package netty3.socket.server; import static org.jboss.netty.channel.Channels.pipeline; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.stream.ChunkedWriteHandler; public class InitServer
{
private static InitServer sockServer = null; private static ServerBootstrap bootstrap = null; public static InitServer getInstance()
{
if (sockServer == null)
{
sockServer = new InitServer();
}
return sockServer;
} public InitServer()
{
bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory()
{
public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new ServerHandler()); return pipeline;
} }); bootstrap.bind(new InetSocketAddress("127.0.0.1", 2777));
} public void shutdownServer()
{
bootstrap.releaseExternalResources();
}
} [java] view plain copy package netty3.socket.server; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.DATE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.TimeZone; import javax.activation.MimetypesFileTypeMap; import netty3.socket.client.SendMsgClient; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelFutureProgressListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FileRegion;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.multipart.Attribute;
import org.jboss.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import org.jboss.netty.handler.codec.http.multipart.DiskFileUpload;
import org.jboss.netty.handler.codec.http.multipart.FileUpload;
import org.jboss.netty.handler.codec.http.multipart.HttpDataFactory;
import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;
import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData;
import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedFile;
import org.jboss.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelHandler
{
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; public static final String HTTP_DATE_GMT_TIMEZONE = "GMT"; public static final int HTTP_CACHE_SECONDS = 60; private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // Disk if size exceed MINSIZE private HttpPostRequestDecoder decoder; private HttpRequest request; private String receiveFileName = ""; private Map<String, String> msgMap = new HashMap<String, String>(); private boolean readingChunks = false; static
{
DiskFileUpload.baseDirectory = "/home/build1/file_test/";
} public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
if (e.getMessage() instanceof HttpRequest)
{
HttpRequest request = (DefaultHttpRequest)e.getMessage();
String uri = sanitizeUri(request.getUri()); System.out.println(request.isChunked()); if (request.getMethod() == HttpMethod.POST)
{
// 接收客户端上传的文件
receiveFileName = uri;
this.request = request; // clean previous FileUpload if Any
if (decoder != null)
{
decoder.cleanFiles();
decoder = null;
} // if GET Method: should not try to create a HttpPostRequestDecoder
try
{
decoder = new HttpPostRequestDecoder(factory, request);
}
catch(Exception e1)
{
e1.printStackTrace();
writeResponse(e.getChannel(), "接收文件信息时出现异常:" + e1.toString());
Channels.close(e.getChannel());
return;
} if (!request.isChunked())
{
readHttpDataAllReceive(e.getChannel());
writeResponse(e.getChannel(), "服务端文件接收完毕!");
}
}
}
else
{
// New chunk is received
HttpChunk chunk = (HttpChunk)e.getMessage();
// example of reading only if at the end
if (!chunk.isLast())
{
try
{
decoder.offer(chunk);
}
catch(Exception e1)
{
e1.printStackTrace();
writeResponse(e.getChannel(), "接收文件数据时出现异常:" + e1.toString());
Channels.close(e.getChannel());
return;
} // example of reading chunk by chunk (minimize memory usage due to Factory)
readHttpDataChunkByChunk(); } else {
readHttpDataAllReceive(e.getChannel()); //最后数据
//writeResponse(e.getChannel(), "服务端数据接收完毕!");
String sendMsg = msgMap.get("sendMsg");
System.out.println("服务端收到消息:" + sendMsg); sendReturnMsg(ctx, HttpResponseStatus.OK, "服务端返回的消息!");
}
}
} /**
* Example of reading all InterfaceHttpData from finished transfer
*/
private void readHttpDataAllReceive(Channel channel)
{
List<InterfaceHttpData> datas;
try
{
datas = decoder.getBodyHttpDatas();
}
catch(Exception e1)
{
e1.printStackTrace();
writeResponse(channel, "接收文件数据时出现异常:" + e1.toString());
Channels.close(channel);
return;
} for (InterfaceHttpData data : datas)
{
writeHttpData(data);
}
} /**
* Example of reading request by chunk and getting values from chunk to chunk
*/
private void readHttpDataChunkByChunk()
{
try
{
while(decoder.hasNext())
{
InterfaceHttpData data = decoder.next();
if (data != null)
{
// new value
writeHttpData(data);
}
}
}
catch(EndOfDataDecoderException e1)
{
e1.printStackTrace();
}
} private void writeHttpData(InterfaceHttpData data)
{
if (data.getHttpDataType() == HttpDataType.FileUpload)
{
FileUpload fileUpload = (FileUpload)data;
if (fileUpload.isCompleted())
{
try
{
Random r = new Random();
StringBuffer fileNameBuf = new StringBuffer();
fileNameBuf.append(DiskFileUpload.baseDirectory).append("U").append(System.currentTimeMillis());
fileNameBuf.append(String.valueOf(r.nextInt(10))).append(String.valueOf(r.nextInt(10)));
fileNameBuf.append(receiveFileName.substring(receiveFileName.lastIndexOf("."))); fileUpload.renameTo(new File(fileNameBuf.toString()));
}
catch(IOException e)
{
e.printStackTrace();
}
System.out.println("结束时间:"+System.currentTimeMillis());
}
else
{
System.out.println("\tFile to be continued but should not!\r\n");
}
}
else if (data.getHttpDataType() == HttpDataType.Attribute)
{
Attribute attribute = (Attribute)data;
try
{
msgMap.put(attribute.getName(), attribute.getString());
}
catch(IOException e)
{
e.printStackTrace();
}
}
} private void writeResponse(Channel channel, String retMsg)
{
// Convert the response content to a ChannelBuffer.
ChannelBuffer buf = ChannelBuffers.copiedBuffer(retMsg, CharsetUtil.UTF_8); // Decide whether to close the connection or not.
boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.getHeader(HttpHeaders.Names.CONNECTION))
|| request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
&& !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.getHeader(HttpHeaders.Names.CONNECTION)); // Build the response object.
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setContent(buf);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); if (!close)
{
// There's no need to add 'Content-Length' header
// if this is the last response.
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
} // Write the response.
ChannelFuture future = channel.write(response);
// Close the connection after the write operation is done if necessary.
if (close)
{
future.addListener(ChannelFutureListener.CLOSE);
}
} private String sanitizeUri(String uri)
{
try
{
uri = URLDecoder.decode(uri, "UTF-8");
}
catch(UnsupportedEncodingException e)
{
try
{
uri = URLDecoder.decode(uri, "ISO-8859-1");
}
catch(UnsupportedEncodingException e1)
{
throw new Error();
}
} return uri;
} /**
* 方法描述:设置请求响应的header信息
* @param response 请求响应对象
* @param fileToCache 下载文件
*/
private static void setContentTypeHeader(HttpResponse response, File fileToCache)
{
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.setHeader(CONTENT_TYPE, mimeTypesMap.getContentType(fileToCache.getPath())); SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE)); // Date header
Calendar time = new GregorianCalendar();
response.setHeader(DATE, dateFormatter.format(time.getTime())); // Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.setHeader(EXPIRES, dateFormatter.format(time.getTime()));
response.setHeader(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.setHeader(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
} /**
* 方法描述:给客户端发送反馈消息
* @param ctx 发送消息的通道
* @param status 状态
* @param retMsg 反馈消息
*/
private static void sendReturnMsg(ChannelHandlerContext ctx, HttpResponseStatus status, String retMsg)
{
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.setContent(ChannelBuffers.copiedBuffer(retMsg, CharsetUtil.UTF_8)); // 信息发送成功后,关闭连接通道
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
} public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
if (decoder != null)
{
decoder.cleanFiles();
}
System.out.println("连接断开:" + e.getChannel().getRemoteAddress().toString());
} public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
String remoteIp = e.getChannel().getRemoteAddress().toString();
System.out.println(remoteIp.substring(1, remoteIp.indexOf(":")));
System.out.println("收到连接:" + e.getChannel().getRemoteAddress().toString());
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
{
Channel ch = e.getChannel();
Throwable cause = e.getCause();
if (cause instanceof TooLongFrameException)
{
return;
} System.err.println("连接的通道出现异常:" + cause.toString());
if (ch.isConnected())
{
System.out.println("连接还没有关闭!");
ch.close();
}
} }
我在此基础上仿写了一个http server, 但是当执行到 readHttpDataAllReceive(e.getChannel()); //最后数据 这里面的方法体datas = decoder.getBodyHttpDatas();的时候就会报错,我首先忽略这部分异常不返回,还是继续走下面的读取代码,虽然程序没有报错,但是最后获取到的数据就丢失了部分
NotEnoughDataDecoderException
public List<InterfaceHttpData> getBodyHttpDatas()
throws NotEnoughDataDecoderException {
if (!isLastChunk) {
throw new NotEnoughDataDecoderException();
}
return bodyListHttpData;
}
isLastChunk=false得时候就会触发这个异常,但是我们之前的代码流程不是已经到了最后一个chunk了吗,怎么回事?我们再看看isLastChunk的代码
public void offer(HttpChunk chunk) throws ErrorDataDecoderException {
ChannelBuffer chunked = chunk.getContent();
if (undecodedChunk == null) {
undecodedChunk = chunked;
} else {
//undecodedChunk = ChannelBuffers.wrappedBuffer(undecodedChunk, chunk.getContent());
// less memory usage
undecodedChunk = ChannelBuffers.wrappedBuffer(
undecodedChunk, chunked);
}
if (chunk.isLast()) {
isLastChunk = true;
}
parseBody();
}
offer方法里会触发isLastChunk=true, 那问题就清晰了,我们再回到readHttpDataAllReceive(e.getChannel()); //最后数据 这段代码里,在这之前也加上
try
{
decoder.offer(chunk);
}
catch(Exception e1)
{
e1.printStackTrace();
return;
}
问题解决,接收数据也完整
*也提及过这个问题
https://*.com/questions/23989217/posting-data-to-netty-with-apache-httpclient
To solve the problem you either need to offer()
all chunks (HttpContent
) of a message to HttpPostRequestDecoder
before calling getBodyHttpDatas()
, or alternatively you can just add the HttpObjectAggregator
handler right before your handler to the channel's pipeline. If you do so, HttpObjectAggregator
will collect all chunks for you and produce a single FullHttpRequest
in place of multiple chunks. Passing FullHttpRequest
instead of an ordinary HttpRequest
to HttpPostRequestDecoder
's constructor eliminates need to offer()
chunks.
意思是通过offer方法把一个http请求的若干个chunk合在一起,在调用getBodyHttpDatas()前必须使用offer()方法把message完整化
刚开始写这个http server的时候,客户端发送数据很少,包体不会拆分成若干个chunk,一切都很正常,后来当客户端发送数据很大的时候,使用之前的httpRequest根本没有办法获取到数据
PS: 一个http请求里的拆分成若干个chunk后,channelid是一样的, 我们通过这个channelid来重新组装数据