美文网首页
netty实现通过私有协议的通信和探活

netty实现通过私有协议的通信和探活

作者: 西5d | 来源:发表于2020-09-04 16:22 被阅读0次

背景

我们知道类似HTTP协议属于一种网络规范,包括数据结构的定义和通信规则。很多情况下,业务可能基于自身的需要,实现自有的协议,比如像Dubbo的dubbo协议。

描述

本期继续netty相关内容,这次我们根据已有例子,自己定义网络协议并完成通信。主要包括协议数据结构的定义,数据的解析和反解析,网络交互规则,而交互包括认证和心跳两部分。
下表是整体的介绍,描述各个部分的功能。

类名称 功能
Header 协议数据的头部
NettyMessage 协议数据,包括头部和数据部分
HeartbeatReqHandler 客户端处理心跳消息
HeartbeatRespHandler 服务端处理心跳消息
LoginAuthReqHandler 客户端处理登录或者握手消息
LoginAuthRespHandler 服务端处理登录或者握手消息
NettyMessageDecoder 从收到的byte字节数据解码成消息体
NettyMessageEncoder 对数据进行编码转换成byte字节发送
NettyProtocolClient 客户端实例
NettyProtocolServer 服务端实例

下面依次介绍下各个部分和包含的代码,内容会比较多。

代码

消息头结构

消息头的定义,如协议标记,用来识别协议,是个固定值。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义协议的消息头
 */
@NoArgsConstructor
@Data
public class Header {
    //协议标记
    private int crcCode = 0xabef0101;
    //消息长度
    private int length;
    private long sessionId;
    //消息类型 MessageType
    private byte type;
    //优先级
    private byte priority;
    //扩展
    private Map<String, Object> attachment = new HashMap<>();

    public Header(byte type) {
        this.type = type;
    }

    //消息类型定义
    @AllArgsConstructor
    @Getter
    public enum MessageType {
        BIZ_REQ((byte) 0),
        BIZ_RESP((byte) 1),
        ONE_WAY_REQ((byte) 2),
        HANDSHAKE_REQ((byte) 3),
        HANDSHAKE_RESP((byte) 4),
        HEARTBEAT_REQ((byte) 5),
        HEARTBEAT_RESP((byte) 6),
        ;
        byte value;
    }

}

消息体结构

消息体结构比较简单,除了头部,剩下的数据用Object处理。

import lombok.Data;

/**
 * 消息结构
 */
@Data
public class NettyMessage {
    private Header header;
    private Object body;
}

消息封装(编码)

这里介绍对消息进行编码,首先处理头部,从消息对象msg中一次读取header的各项内容,然后按照顺序写入到ByteBuf中,注意根据类型的不同,使用的写入方法也是不同的。这里的ByteBuf是netty对byte[]操作的封装,如果自己实现,要麻烦很多。之后进行header.attachment的写入,因为key是string,而value是对象,所以这里我把value先转成String,然后写入大小和内容(实际操作先写入大小,再写入内容),而且因为是对象,把对象的类型提取出来,也写到后面;同样,如果消息的数据部分body不为空,采用和上面一致的方式,写入body转换后的内容大小,也写入body的类型名称。具体的也可以参照代码来处理。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public final class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, List<Object> out) throws Exception {
        if (null == msg || msg.getHeader() == null) {
            throw new Exception("encode message is null!");
        }

        ByteBuf sendBuf = Unpooled.buffer();
        sendBuf.writeInt(msg.getHeader().getCrcCode())
                .writeInt(msg.getHeader().getLength())
                .writeLong(msg.getHeader().getSessionId())
                .writeByte(msg.getHeader().getType())
                .writeByte(msg.getHeader().getPriority())
                .writeInt(msg.getHeader().getAttachment().size());

        byte[] keyArray;
        byte[] valueArray;
        for (Map.Entry<String, Object> entry : msg.getHeader().getAttachment().entrySet()) {
            //写入key,先写大小,再写值,值用Gson转成String,同时把值的类型也写入
            keyArray = entry.getKey().getBytes(StandardCharsets.UTF_8);
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            //写值的数据,先写大小,再写内容
            valueArray = GsonUtil.toJson(entry.getValue()).getBytes(StandardCharsets.UTF_8);
            sendBuf.writeInt(valueArray.length);
            sendBuf.writeBytes(valueArray);
            //写值的类型名称
            byte[] clazzArray = entry.getValue().getClass().getName().getBytes();
            sendBuf.writeInt(clazzArray.length);
            sendBuf.writeBytes(clazzArray);
        }
        if (msg.getBody() != null) {
            byte[] bodyArray = GsonUtil.toJson(msg.getBody()).getBytes(StandardCharsets.UTF_8);
            sendBuf.writeInt(bodyArray.length);
            sendBuf.writeBytes(bodyArray);
            byte[] clazzArray = msg.getBody().getClass().getName().getBytes();
            sendBuf.writeInt(clazzArray.length);
            sendBuf.writeBytes(clazzArray);
        }
        //sendBuf.writeInt(0);
        sendBuf.setInt(4, sendBuf.readableBytes());
        out.add(sendBuf);
    }
}

消息解析(解码)

解码就相当于编码的反操作,依次按照顺序解出内容。首先是消息头部,对于对象类型的处理,是先读取对象的大小,再取内容,再取类型的大小和名称,然后通过Gson转换回java实例对象,如下的打码,可以详细了解下注释的部分。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public final class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        //        if (null == frame) {
        //            return null;
        //        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setCrcCode(in.readInt());
        header.setLength(in.readInt());
        header.setSessionId(in.readLong());
        header.setType(in.readByte());
        header.setPriority(in.readByte());

        int size = in.readInt();
        if (size > 0) {
            Map<String, Object> attch = new HashMap<>();
            int keySize;
            int valueSize;
            byte[] keyArray;
            byte[] valueArray;
            String key;
            String value;
            for (int i = 0; i < size; i++) {
                //读取长度值
                keySize = in.readInt();
                keyArray = new byte[keySize];
                //读取内容
                in.readBytes(keyArray);
                key = new String(keyArray, StandardCharsets.UTF_8);

                valueSize = in.readInt();
                valueArray = new byte[valueSize];
                in.readBytes(valueArray);
                value = new String(valueArray, StandardCharsets.UTF_8);
                int clazzSize = in.readInt();
                byte[] clazzArray = new byte[clazzSize];
                in.readBytes(clazzArray);
                String clazz = new String(clazzArray, StandardCharsets.UTF_8);
                attch.put(key, GsonUtil.parse(value, Class.forName(clazz)));
            }
            header.setAttachment(attch);
        }
        //4代表一个int,超过表示还有数据
        if (in.readableBytes() > 4) {
            //body的长度
            int bodyLength = in.readInt();
            byte[] bodyArray = new byte[bodyLength];
            //body的内容并转成String
            in.readBytes(bodyArray);
            String bodyJson = new String(bodyArray, StandardCharsets.UTF_8);
            int clazzSize = in.readInt();
            byte[] clazzArray = new byte[clazzSize];
            in.readBytes(clazzArray);
            String clazz = new String(clazzArray, StandardCharsets.UTF_8);
            //根据body的内容json和类型名称,转换成对象
            Object body = GsonUtil.parse(bodyJson, Class.forName(clazz));
            message.setBody(body);
        }
        message.setHeader(header);
        return message;
    }
}

附加-json转换

为方便直接运行代码使用,把对应的json转换也放到这里。

import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Slf4j
public class GsonUtil {
    private static final Gson GSON =
            new GsonBuilder().disableHtmlEscaping().setDateFormat("yyyy-MM-dd HH:mm:ss").serializeNulls().addSerializationExclusionStrategy(new ExclusionStrategy() {
                @Override
                public boolean shouldSkipField(FieldAttributes fieldAttributes) {
                    return fieldAttributes.getAnnotation(GsonIgnore.class) != null;
                }

                @Override
                public boolean shouldSkipClass(Class<?> aClass) {
                    return false;
                }
            }).create();

    public static String toJson(Object src) {
        return GSON.toJson(src);
    }

    public static <T> T parse(String str, Class<T> classOfT) {
        return GSON.fromJson(str, classOfT);
    }

    /**
     * 解析泛型
     *
     * @param json
     * @param typeToken
     * @param <T>
     * @return
     */
    public static <T> T parse(String json, TypeToken typeToken) {
        return GSON.fromJson(json, typeToken.getType());
    }
}

登录处理

  1. 客户端登录请求
    客户端处理登录请求的代码,核心是在启动的时候在channelActive方法中,发送一个登录注册请求。在channelRead方法中,获取从服务端的返回,并根据协议消息的头部字段,判断类型。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 客户端支持登录请求
 */
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {

    private NettyMessage buildAuthReq() {
        NettyMessage message = new NettyMessage();
        Header header = new Header(Header.MessageType.HANDSHAKE_REQ.value);
        message.setHeader(header);
        return message;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("login auth req handler active ...");
        NettyMessage message = buildAuthReq();
        System.out.println("[send] client send login req ---> " + message);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null && message.getHeader().getType() == Header.MessageType.HANDSHAKE_RESP.value) {
            System.out.println("[recv] client receive login resp ---> " + message);
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte) 0) {
                System.out.println("handshake fail!");
                ctx.close();
            } else {
                System.out.println("handshake success.");
                ctx.fireChannelRead(msg);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

  1. 服务端登录请求
    服务端对登录请求的处理,在收到消息后,判断是否是握手消息,如果是握手消息,这里加了个IP白名单的判断,如果判断成功,构造一个握手返回请求,返回给客户端。这里用body的值来判断是否握手成功。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 服务端处理登录握手
 */
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();
    private String[] whiteList = new String[] {"127.0.0.1"};

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage) msg;
        if (message.getHeader() != null && message.getHeader().getType() ==  Header.MessageType.HANDSHAKE_REQ.value) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp;
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean ok = false;
                for (String e : whiteList) {
                    if (e.equals(ip)) {
                        ok = true;
                        break;
                    }
                }
                loginResp = ok ? buildResponse((byte) 0) : buildResponse((byte) -1);
                if (ok) {
                    nodeCheck.put(nodeIndex, true);
                }
            }
            System.out.println("[send] login response :" + loginResp + " , body:" + loginResp.getBody());
            ctx.writeAndFlush(loginResp);
        }else {
            ctx.fireChannelRead(msg);
        }
    }

    //构造握手返回请求
    private NettyMessage buildResponse(byte resp) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(Header.MessageType.HANDSHAKE_RESP.value);
        message.setHeader(header);
        message.setBody(resp);
        return message;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        nodeCheck.remove(ctx.channel().remoteAddress().toString());
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}

心跳处理

  1. 客户端心跳处理
    这里看下客户端的心跳处理,有一个调度线程池,操作就是定时地发送心跳消息给服务端。同时channelRead在负责收到消息后,判断执行的操作。这里心跳是在收到握手成功的消息后开始的。注意channelAcitve方法是否执行和netty pipline中的channelHandler执行顺序有关系,比较复杂感谢的同学可以额外了解下。
    这里除了在收到握手消息之后开始心跳探测,后续正常收到心跳返回包,则打印日志。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * 客户端处理心跳请求
 */
public class HeartbeatReqHandler extends ChannelInboundHandlerAdapter {

    private volatile ScheduledFuture<?> heartbeat;

//    @Override
//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        System.out.println("heartbeat req channel active ...");
//        heartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx), 0, 5, TimeUnit.SECONDS);
//    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage)msg;
        if (message.getHeader() != null) {
            //heartbeat 稍微调整下代码,因为握手比较少,心跳比较多。
            if (message.getHeader().getType() == Header.MessageType.HEARTBEAT_RESP.value) {
                System.out.println("[recv] client receive server heartbeat message ---> " + message);
            } else
                //login,握手成功以后开始客户端给服务端启动心跳
                if (message.getHeader().getType() == Header.MessageType.HANDSHAKE_RESP.value) {
                    heartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx), 0, 5, TimeUnit.SECONDS);
                    System.out.println("[recv] client receive server login resp, start heartbeat ---> " + message);
                }
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (null != heartbeat) {
            heartbeat.cancel(true);
            heartbeat = null;
        }
        cause.printStackTrace();
        ctx.fireExceptionCaught(cause);
    }


    private class HeartbeatTask implements Runnable {
        private final ChannelHandlerContext ctx;
        public HeartbeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        @Override
        public void run() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(Header.MessageType.HEARTBEAT_REQ.value);
            message.setHeader(header);
            message.setBody("heartbeat @" + LocalDateTime.now().toString() + " ...");
            System.out.println("[send] client send heartbeat to server ---> " + message);
            ctx.writeAndFlush(message);
        }
    }
}

  1. 服务端心跳处理
    服务端的心跳处理就比较简单了,在收到心跳请求后,发送对等的包给客户端。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;

/**
 * 服务端处理心跳请求
 */
public class HeartbeatRespHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage message = (NettyMessage)msg;

        if (message.getHeader() != null && message.getHeader().getType() == Header.MessageType.HEARTBEAT_REQ.value) {
            System.out.println("[recv] server received heartbeat from client ---> " + message);
            NettyMessage heartbeat = new NettyMessage();
            Header header = new Header();
            header.setType(Header.MessageType.HEARTBEAT_RESP.value);
            heartbeat.setHeader(header);
            heartbeat.setBody("heartbeat server resp @" + LocalDateTime.now());
            System.out.println("[send] server send heartbeat resp to client ---> " + heartbeat);
            ctx.writeAndFlush(heartbeat);
        }else {
            ctx.fireChannelRead(msg);
        }
    }
}

消息客户端

消息处理的客户端程序。注意有一个线程池用来做重试,这里这是简单的一次尝试重连,只做示例。其次,注意pipline的添加顺序可能影响Handler的执行结果。其他可以看下代码注释。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class NettyProtocolClient {
    private ExecutorService executor = Executors.newFixedThreadPool(1);

    public void connect(int port, String host) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new NettyMessageDecoder(1024 * 1024, 4, 4))
                                    .addLast(new NettyMessageEncoder())
                                    .addLast(new ReadTimeoutHandler(50))
                                    .addLast(new LoginAuthReqHandler())
                                    .addLast(new HeartbeatReqHandler());
                        }
                    });

            final String localhost = "127.0.0.1";
            final int localPort = 1922;
            //如果加了本地地址和端口,相当于固定了本地端口,如果停止后立刻再次登录,由于之前请求会保留TIME_WAIT状态socket一段时间,会抛出Caused by: java.net.BindException: 地址已在使用
            //没有设置本地端口,每次本地端口是新生成的,短时间内立即请求会消耗大量系统连接资源
            System.out.println("client connect @" + host + ":" + port);
            //注意用了两个sync,没有channelFuture可能不会抛出异常
            ChannelFuture future =
                    bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(localhost, localPort)).sync().channel().closeFuture().sync();
            future.channel().closeFuture().sync();
            //不会抛异常
            //bootstrap.connect(new InetSocketAddress(host, port), new InetSocketAddress(localhost, localPort)).channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
            //资源释放后,清空,然后发起重连
            //这里加了强制等待线程执行1分钟,否则是直接退出的。
            executor.execute(() -> {
                try {
                    int timeout = 15;
                    System.out.println("retry connect after :" + timeout);
                    TimeUnit.SECONDS.sleep(timeout);
                    try {
                        System.out.println("retry connect ...");
                        connect(port, host);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            if (!executor.isTerminated()) {
                try {
                    executor.awaitTermination(1, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test
    public void runClient() {
        int port = 12342;
        String host = "127.0.0.1";
        connect(port, host);
    }
}

消息服务端

服务端相对要简单一些,添加各个EventLoopGroup之后,追加自定的消息解析器和协议handler,绑定指定端口启动就可以接收情趣了。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.junit.Test;

public class NettyProtocolServer {
    private void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();

        try {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new NettyMessageDecoder(1024 * 1024, 4, 4))
                                    .addLast(new NettyMessageEncoder())
                                    .addLast(new ReadTimeoutHandler(60))
                                    .addLast(new LoginAuthRespHandler())
                                    .addLast(new HeartbeatRespHandler());
                        }
                    });
            System.out.println("netty protocol server@" + port);
            bootstrap.bind(port).channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    @Test
    public void runServer() throws InterruptedException {
        bind(12342);
    }
}

运行结果

服务端

netty protocol server@12342
[send] login response :NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}), body=0) , body:0
[recv] server received heartbeat from client ---> NettyMessage(header=Header(crcCode=-1410399999, length=86, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:19:58.102 ...)
[send] server send heartbeat resp to client ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:19:58.118)
[recv] server received heartbeat from client ---> NettyMessage(header=Header(crcCode=-1410399999, length=86, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:20:03.089 ...)
[send] server send heartbeat resp to client ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:20:03.091)
[recv] server received heartbeat from client ---> NettyMessage(header=Header(crcCode=-1410399999, length=86, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:20:08.090 ...)
[send] server send heartbeat resp to client ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:20:08.091)

客户端

client connect @127.0.0.1:12342
login auth req handler active ...
[send] client send login req ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}), body=null)
[recv] client receive login resp ---> NettyMessage(header=Header(crcCode=-1410399999, length=45, sessionId=0, type=4, priority=0, attachment={}), body=0)
handshake success.
[recv] client receive server login resp, start heartbeat ---> NettyMessage(header=Header(crcCode=-1410399999, length=45, sessionId=0, type=4, priority=0, attachment={}), body=0)
[send] client send heartbeat to server ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:19:58.102 ...)
[recv] client receive server heartbeat message ---> NettyMessage(header=Header(crcCode=-1410399999, length=94, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:19:58.118)
[send] client send heartbeat to server ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:20:03.089 ...)
[recv] client receive server heartbeat message ---> NettyMessage(header=Header(crcCode=-1410399999, length=94, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:20:03.091)
[send] client send heartbeat to server ---> NettyMessage(header=Header(crcCode=-1410399999, length=0, sessionId=0, type=5, priority=0, attachment={}), body=heartbeat @2020-09-04T16:20:08.090 ...)
[recv] client receive server heartbeat message ---> NettyMessage(header=Header(crcCode=-1410399999, length=94, sessionId=0, type=6, priority=0, attachment={}), body=heartbeat server resp @2020-09-04T16:20:08.091)
client
server

总结

以上就是本期的内容,netty确实在网络处理方面比较强大,能够比较容易地在其基础上实现各种网络组件,比自己从头开发要方便很多。本期也参考了书本和网络的知识(书之前吐槽过),在官方的github没有找到本期内容的示例,有篇文章但功能不是很完善。后面会给到所有的参考链接。

参考资料

[1]. netty example
[2]. 《netty权威指南》私有协议

相关文章

  • netty实现通过私有协议的通信和探活

    背景 我们知道类似HTTP协议属于一种网络规范,包括数据结构的定义和通信规则。很多情况下,业务可能基于自身的需要,...

  • Netty私有协议栈开发

    简介 本文是整理《netty权威指南》第十二章:私有协议栈开发,方便后续自己实现网络通信协议的参考。 几个基本功能...

  • Netty入门

    Netty是一个异步的事件驱动网络框架,使用Netty可以研发高性能的私有协议,将业务逻辑和网络进行解耦,通过Ne...

  • 技术帖 | dubbo源码分析 -- 远程通信 netty

    dubbo 底层通信选择了 netty 这个 nio 框架做为默认的网络通信框架并且通过自定义协议进行通信。dub...

  • rocketmq-4.4.0从零单排(详细参数与配置)

    rocketmq采用netty作为底层通信框架,其中具体技术细节如下: 1、通信协议 rocketmq通信协议采用...

  • 15 Netty编解码框架

    1 Netty编解码框架 在上一节中,我们提到TCP的粘包、拆包问题,可以通过自定义通信协议的方式来解决。通信协议...

  • netty自定义协议(私有协议栈)

    此文参考Netty权威指南《私有协议栈开发》章节 1.概述 绝大多数的私有协议栈开发都是基于TCP/IP协议,所以...

  • TcpClient

    参考:Netty工作原理 Rpc通信,无非就是通过网络层的一些可靠协议进行数据传输的!这里我们的协议是tcp协议,...

  • k8s网络之calico学习

    一、知识准备 1.calico主要通过ipip协议与bgp协议来实现通信。前者通过ipip隧道作为通信基础,后者则...

  • Netty框架服务端感知客户端状态——IdleStateHand

    前段时间用Netty搭了个Mqtt broker,初步实现了端到端的通信,Mqtt协议是基于发布/订阅(publi...

网友评论

      本文标题:netty实现通过私有协议的通信和探活

      本文链接:https://www.haomeiwen.com/subject/crazsktx.html