背景
最近有接触到netty,netty是一个比较流行的网络框架,提供和封装了比较丰富的网络处理API,可以比较容易地用它来实现所需要的功能,很多的中间件采用了netty来处理网络。要注意的是,目前持续支持的是netty4,netty5已经被废弃了,原因有一是引入了ForkJoinPool
但提升不是很大,其他具体的可以看官网作者的讨论,有github地址。
最近也接触到大文件下载等相关的内容,所以正好就开始实战netty的内容。
吐槽
身边有一本《netty权威指南》,之前只看过一些概念性的内容,最近看到这个例子,质量实在是一言难尽,除了理论的东西,其他不建议买这个书看。首先是代码的排版,非常不好;其次书中本次要提到的例子是从netty官网的例子改的(后面会给官网地址),可能由于排版,删掉了很多代码,导致有些地方看得莫名其妙。还有删掉了import的部分,也很让人困惑,导致原始的例子搞了很久才启动成功。还有就是书里貌似也没有告知netty的具体版本,其他各种方面就不多说了,总结来说就是:
代码
简单说明下,代码分两部分,一个启动服务,一个用来支持文件的下载。是在官网的例子和书里的例子基础上修改的,后面本地用1.4G的视频做了验证,达到了想要的效果。重要的地方做了注释,可以仔细看下代码。注意本地使用的ubuntu+java+maven
,netty的依赖版本4.1.50.Final
启动服务
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.junit.Test;
public class HttpFileServer {
//文件的目录,根据需要自己设置
private static final String DEFAULT_URL = "/home/me/download";
public void run(final int port, final String url) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//!!这里需要注意下,如果有问题可能导致服务无法正常返回,注意参考官网例子。
socketChannel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
socketChannel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
socketChannel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
socketChannel.pipeline().addLast("fileServerHandler", new HttpFileServerHandler(url));
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("file server start ...");
System.out.println("port:" + port + ", url:" + url);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Test
public void runFileServer() throws Exception {
int port = 9993;
new HttpFileServer().run(port, DEFAULT_URL);
}
}
下载处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_0;
public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
private static final Pattern ALLOW_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
private final String url;
public HttpFileServerHandler(String url) {
this.url = url;
}
//展示文件的目录
private static void sendListing(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
StringBuilder builder = new StringBuilder();
builder.append("<html><body>\r\n");
builder.append("list:\r\n");
builder.append("<ul>");
builder.append("<li> link:<a href=\"..\">..</a></li>\r\n");
for (File f : dir.listFiles()) {
if (!ALLOW_FILE_NAME.matcher(f.getName()).matches()) {
continue;
}
builder.append("<li> link:<a href=\"").append(f.getAbsolutePath()).append("\">").append(f.getAbsolutePath()).append("</a></li>");
}
builder.append("</ul></body></html>\r\n");
ByteBuf buffer = Unpooled.copiedBuffer(builder, StandardCharsets.UTF_8);
response.content().writeBytes(buffer);
buffer.release();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
if (request.method() != HttpMethod.GET) {
sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri();
final String path = sanitizeUri(uri);
if (null == path) {
sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
File file = new File(path);
if (!file.exists() || file.isHidden()) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
if (file.isDirectory()) {
if (uri.startsWith("/")) {
sendListing(ctx, file);
} else {
sendRedirect(ctx, uri + "/");
}
return;
}
if (!file.isFile()) {
sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");
} catch (Exception e) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
long length = randomAccessFile.length();
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
//设置content-length大小
HttpUtil.setContentLength(httpResponse, length);
setContentType(httpResponse, file);
if (!HttpUtil.isKeepAlive(request)) {
httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (request.protocolVersion().equals(HTTP_1_0)) {
httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the initial line and the header.
ctx.write(httpResponse);
// Write the content.
//处理分段
ChannelFuture sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(randomAccessFile, 8192)), ctx.newProgressivePromise());
//显示进度,可以用稍大的文件实验
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
if (total < 0) {
System.err.println("progress:" + progress);
} else {
//!!可以用来测试
//TimeUnit.SECONDS.sleep(1);
System.out.println(String.format("progress: %s\t%s", progress, total));
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println(future.channel() + "complete.");
}
});
if (!HttpUtil.isKeepAlive(request)) {
sendFileFuture.addListener(ChannelFutureListener.CLOSE);
}
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.content().writeCharSequence("失败", StandardCharsets.UTF_8);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//给地址转码
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, StandardCharsets.ISO_8859_1.name());
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
if (!uri.startsWith(url)) {
uri = url + uri;
}
if (!uri.startsWith("/")) {
return null;
}
uri = uri.replace('/', File.separatorChar);
if (uri.contains(File.pathSeparator + ".")
|| uri.contains("." + File.separator)
|| uri.startsWith(".")
|| uri.endsWith(".")
|| INSECURE_URI.matcher(uri).matches()) {
return null;
}
//这里用直接返回的方式
return uri;
// return System.getProperty("user.dir") + File.separator + uri;
}
private void setContentType(HttpResponse response, File file) {
MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();
response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));
}
private void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FOUND);
response.headers().set(HttpHeaderNames.LOCATION, newUri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
下载大文件的输出结果:
file server start ...
port:9993, url:/home/me/download
progress: 3088384 1433994604
progress: 3088384 1433994604
progress: 3088384 1433994604
progress: 3088384 1433994604
progress: 3088384 1433994604
progress: 3129344 1433994604
progress: 3129344 1433994604
progress: 3129344 1433994604
progress: 3129344 1433994604
progress: 3129344 1433994604
progress: 3170304 1433994604
progress: 3170304 1433994604
progress: 3170304 1433994604
[id: 0x18c57827, L:0.0.0.0/0.0.0.0:9993 ! R:/127.0.0.1:42060]complete.
官网参考
总结
以上就是本期的内容,最终我们达到了分段下载大文件的目的,确实还是实战才能知道是否好用。题外再说,之前也在找相关的例子,国内网上很多都是相同的抄来抄去,很少有自己的东西,大量的质量不好,期待能有个优质的内容分享平台。
网友评论