测试demo和启动
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new HttpServer(9004).start();
}
public void start() throws Exception {
ServerBootstrap b = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
System.out.println("initChannel ch:" + ch);
ch.pipeline()
.addLast("decoder", new HttpRequestDecoder()) // 1
.addLast("encoder", new HttpResponseEncoder()) // 2
.addLast("aggregator", new HttpObjectAggregator(512 * 1024)) // 3
.addLast("handler", new HttpHandler()); // 4
}
})
.option(ChannelOption.SO_BACKLOG, 128) // determining the number of connections queued
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
b.bind(port).sync();
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { // 1
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
System.out.println("class:" + msg.getClass().getName());
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer("test".getBytes())); // 2
HttpHeaders heads = response.headers();
heads.add(CONTENT_TYPE, "text/plain; charset=UTF-8");
heads.add(CONTENT_LENGTH, response.content().readableBytes()); // 3
heads.add(CONNECTION, "keep-alive");
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
super.channelReadComplete(ctx);
ctx.flush(); // 4
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught");
if (null != cause) cause.printStackTrace();
if (null != ctx) ctx.close();
}
}
package main
import (
"fmt"
"net"
"time"
)
func main() {
c, err := net.Dial("tcp", "localhost:9004")
if err != nil {
fmt.Println(err)
return
}
go func() {
for {
buf := make([]byte, 100)
n, err := c.Read(buf)
if err != nil {
fmt.Print(err)
return
}
fmt.Println(string(buf[:n]))
}
}()
c.Write([]byte("GET /"))
time.Sleep(200 * time.Millisecond)
c.Write([]byte(" HTTP/1.1\r\n"))
c.Write([]byte("Host: localhost:9004\r\n"))
time.Sleep(5 * time.Second)
c.Write([]byte("Accept: */*\r\n"))
time.Sleep(200 * time.Millisecond)
c.Write([]byte("Content-Length: 0\r\n"))
c.Write([]byte("\r\n"))
time.Sleep(500 * time.Millisecond)
time.Sleep(5 * time.Second)
}
启动服务端。
启动客户端。或telnet手动进行http协议文本拼接。
执行结果
由执行结果可看出:将http协议报文随意拆分上传,中间有不同程度的sleep,服务端仍然可以正确处理。
分析过程
函数调用栈信息:
![](https://img.haomeiwen.com/i11312842/c660e27bd74a323e.png)
代码注释:
io.netty.handler.codec.ByteToMessageDecoder
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { //1. channel接收到io事件,传入byteBuf。可能是完整的http协议请求,也可能是不完整;header可能不足行或跨行,即协议的语义不够完整。
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
//2. cumulation为null,创建可进行累计的ByteBuf。
if (first) {
cumulation = data;
} else {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
expandCumulation(ctx, data.readableBytes());
}
//3. cumulation不为null,即表明当前有不完整的http语义的报文,需要追加当前IO事件读取到的byteBuf。
//比如:上一个byteBuf中是"GET /", http codec 发现请求行不完整,需要继续等待新IO事件,接收新的byteBuf,进行累加,再次触发http codec的解析。
cumulation.writeBytes(data);
data.release();
}
//4. 进行http解码操作。
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//5. 如果cumulation不为null且没有可读数据,就释放cumulation。即当cumulation中没有不完整的http语义就可以释放cumulation,有不完整http语义就不能释放。
//这里需要注意,不完整的http语义不是完整的请求,比如:Host: localhost:9004是完整的一个请求头,是完整的http语义,但不是完整请求。
//因此http codec 解析后会将key value存到对应的反序列化的message对象中,该cumulation可以释放了,不需要等待完整请求接收后才释放数据。
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
io.netty.handler.codec.ReplayingDecoder
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//1. 接收byteBuf,即上文中的cumulation。
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
S oldState = state;
int oldInputLength = in.readableBytes();
try {
//2. 发起codec解析
decode(ctx, replayable, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
"data or change its state if it did not decode anything.");
} else {
// Previous data has been discarded or caused state transition.
// Probably it is reading on.
continue;
}
}
} catch (Signal replay) {
//3. codec解析中发现有不完整的http语义,会抛出replay信号异常,用于下一次“重播”
replay.expect(REPLAY);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
// Return to the checkpoint (or oldPosition) and retry. 回退checkpoint,即重置byteBuf读取指针。
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
"or change its state if it decoded something.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
io.netty.handler.codec.http.HttpObjectDecoder
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
//1. 根据当前状态尝试完成当前语义的解析。
switch (state()) {
//2. 接收http请求前,先跳过io buffer中的空白行或者其他控制符。比如在telnet端口后不断进行换行回车操作,该逻辑进行过滤。
//3. 没有错误,更新state为READ_INITIAL,更新读取指针为最新。
case SKIP_CONTROL_CHARS: {
try {
skipControlCharacters(buffer);
checkpoint(State.READ_INITIAL);
} finally {
checkpoint();
}
// fall-through
}
//3. 上一个case没有break,fallthrough。
//解析请求行信息。如果满足 METHOD URL VERSION的格式,则初始化请求对象,便于后续数据的反序列化,更新state为READ_HEADER,更新读取指针为最新。
//如果请求行不完整,lineParser.parse(buffer)会抛出REPLAY异常
case READ_INITIAL: try {
String[] initialLine = splitInitialLine(lineParser.parse(buffer));
if (initialLine.length < 3) {
// Invalid initial line - ignore.
checkpoint(State.SKIP_CONTROL_CHARS);
return;
}
message = createMessage(initialLine);
checkpoint(State.READ_HEADER);
// fall-through
} catch (Exception e) {
out.add(invalidMessage(e));
return;
}
//4. readHeaders负责解析header,遇到两次连续\r\n,认为header结束,更新state状态为下一个状态。
//根据body长度下一个状态可能是READ_CHUNK_SIZE、READ_FIXED_LENGTH_CONTENT。
//如果header数据是不完整的语义,则会抛出REPLAY异常,状态保持不变,等待新IO事件,再次进入此逻辑。
case READ_HEADER: try {
State nextState = readHeaders(buffer);
checkpoint(nextState);
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
reset();
return;
case READ_CHUNK_SIZE:
if (!chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
out.add(message);
return;
default:
//5. 如果body长度为空,直接返回发序列化好的message对象。
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
reset();
return;
}
...略
}
readHeaders
private State readHeaders(ByteBuf buffer) {
final HttpMessage message = this.message;
final HttpHeaders headers = message.headers();
//1. 不断循环解析header。
//如果行数据长度大于0,完成key value拆分,则解析下一行数据;
//如果行数据长度等于0,退出循环。认为header已经全部接收处理。
//如果行数据不是完整的语义,在headerParser.parse(buffer)中会抛出REPLAY异常。
AppendableCharSequence line = headerParser.parse(buffer);
if (line.length() > 0) {
do {
char firstChar = line.charAt(0);
if (name != null && (firstChar == ' ' || firstChar == '\t')) {
StringBuilder buf = new StringBuilder(value.length() + line.length() + 1);
buf.append(value);
buf.append(' ');
buf.append(line.toString().trim());
value = buf.toString();
} else {
if (name != null) {
headers.add(name, value);
}
splitHeader(line);
}
line = headerParser.parse(buffer);
} while (line.length() > 0);
}
// Add the last header.
if (name != null) {
headers.add(name, value);
}
// reset name and value fields
name = null;
value = null;
//2. 解析完所有header,判断body状态。
State nextState;
if (isContentAlwaysEmpty(message)) {
HttpHeaders.removeTransferEncodingChunked(message);
nextState = State.SKIP_CONTROL_CHARS;
} else if (HttpHeaders.isTransferEncodingChunked(message)) {
nextState = State.READ_CHUNK_SIZE;
} else if (contentLength() >= 0) {
nextState = State.READ_FIXED_LENGTH_CONTENT;
} else {
nextState = State.READ_VARIABLE_LENGTH_CONTENT;
}
return nextState;
}
parser的全部逻辑
public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
//1. 遍历buf每个字节,直到遇到\r\n,并返回索引值。如果没有抛出异常,则更新readerIndex为下一个索引。即保证从下一个header开始读取。
int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1);
// Call checkpoint to make sure the readerIndex is updated correctly
checkpoint();
return seq;
}
@Override
public int forEachByte(ByteBufProcessor processor) {
//2. 索引值如果<0,即没有查找到\r\n,抛出REPLAY异常。
int ret = buffer.forEachByte(processor);
if (ret < 0) {
throw REPLAY;
} else {
return ret;
}
}
@Override
public int forEachByte(ByteBufProcessor processor) {
int index = readerIndex;
int length = writerIndex - index;
ensureAccessible();
//3. length是当前待读取的长度。
return forEachByteAsc0(index, length, processor);
}
private int forEachByteAsc0(int index, int length, ByteBufProcessor processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
//4. 待读取长度为0,无可读取数据,返回-1。如果是http协议的空白行,\r\n,长度是2。不会满足当前条件。
if (length == 0) {
return -1;
}
final int endIndex = index + length;
int i = index;
try {
do {
//5. 循环遍历每个字节。不是目标结束符,index++,判断下一个。
if (processor.process(_getByte(i))) {
i ++;
} else {
return i;
}
} while (i < endIndex);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
return -1;
}
@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) value;
if (nextByte == HttpConstants.CR) {
return true;
}
//6. 读到\n行终止符,返回false,终止上层遍历。
if (nextByte == HttpConstants.LF) {
return false;
}
if (size >= maxLength) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw newException(maxLength);
}
size ++;
seq.append(nextByte);
return true;
}
图示说明
![](https://img.haomeiwen.com/i11312842/b94e6d648eaafd3f.png)
网友评论