美文网首页
进程重启端口号冲突了,太坑了吧

进程重启端口号冲突了,太坑了吧

作者: 左小星 | 来源:发表于2020-04-25 17:15 被阅读0次

  相信大家在日常工作中会遇到这种情况,一台机器上两个进程通过socket相互通信,server端重启后端口冲突直接起不来了...此时rd同学心里一紧,是不是写出bug了赶紧查日志发现是端口冲突了笔者目前在猫眼从事servicemesh相关研发工作,我们公司使用新美大的机器,无法拥有对机器的控制权,因此也就无法基于k8s来做servicemesh。简单来说,sdk通过http的方式与mesh交互做服务注册发现,那sdk如何发现mesh 就成了一个问题。

image

  如上图所示,最开始sdk通过本地文件发现mesh的admin地址,mesh重启后为防止端口冲突问题,会选一个未占用的端口进行监听,然后把最新的admin地址写到本地文件。sdk要能及时的刷新mesh最新的admin地址,所以sdk内部要有一个定时任务定期刷新admin地址。由此来看sdk的逻辑就变得非常重了,sdk本身的定位就是序列化和反序列化数据发送给mesh。那么有什么好的方法能解决上面这么恶心的问题吗?当然有啦,通过uds就可以。
  Unix domain socket 又叫 IPC(inter-process communication 进程间通信) socket,用于实现同一主机上的进程间通信。socket 原本是为网络通讯设计的,但后来在 socket 的框架上发展出一种 IPC 机制,就是 UNIX domain socket。虽然网络 socket 也可用于同一台主机的进程间通讯(通过 loopback 地址 127.0.0.1),但是 UNIX domain socket 用于 IPC 更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC 机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。有了uds进行本机通信,再也不用担心mesh重启或启动时端口冲突的问题了。使用uds之后sdk与mesh的交互方式。这样sdk直接拿到uds路径new uds客户端调用mesh就可以了,很方便

image

  sdk的语言有多种,我拿Java来举例。mesh 使用go语言实现,很容易实现一个应用层http协议传输层uds协议的server。

func main() {
  RunServer()
}
​
var (
  // 声明 Unix 套接字的地址
  serverAddr = &net.UnixAddr{Name: "/opt/test.sock", Net: "unix"}
)
​
func RunServer() {
  // unlink 系统调用比较特殊。关于它的描述中有一点:如果这个文件是一个 unix socket,它会被移除,但是打开它的进程可以继续使用它。也就是说新旧进程都会在这个地址监听。
  syscall.Unlink(serverAddr.Name)
  lis, err := net.ListenUnix("unix", serverAddr)
  if err != nil {
    fmt.Println("ListenUnix", err)
    return
  }
  http.HandleFunc("/get", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    query := req.URL.Query()
    get := query.Get("key")
    fmt.Printf("server get key = %s value = %s \n", "key", get)
    builder := strings.Builder{}
    for i := 0; i < 3; i++ {
      builder.WriteString(strconv.Itoa(i))
    }
    s := req.Header.Get("sequenceid")
    w.Header().Add("sequenceid", s)
    w.Write([]byte(builder.String()))
  }))
​
  http.HandleFunc("/post", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    bytes, err := ioutil.ReadAll(req.Body)
    req.Body.Close()
    if err != nil {
      return
    }
    h := req.Header.Get("sequenceid")
    contentType := req.Header.Get("Content-Type")
    fmt.Println(contentType)
    w.Header().Add("sequenceid", h)
    fmt.Printf("server post receive request %s \n", string(bytes))
    w.Write([]byte("post success"))
  }))
  svr := &http.Server{Handler: http.DefaultServeMux}
  err = svr.Serve(lis)
  if err != nil {
    fmt.Println("Serve err:", err)
  }
}
​

  而Java语言就相对麻烦了,在做这一块的时候在网上调研没有找到什么资料,最终使用netty封装了一个HTTPUdsClient的包。因为netty是异步的,所以要把异步转同步,这里只是给出一个简单的demo并没有转同步。那么异步转同步如何实现呢?我的做法是在http header中添加Sequenceid,channel中发完数据后使用CountDownLatch wait 等待,当mesh 返回数据后解码进入到Handler中,触发CountDownLatch 的countDown操作,很容易就异步转同步了。还有一点说明:每次http 响应收到后都会把uds连接关闭掉。

public class NettyUdsHttpClient {

    public static void main(String[] args) throws Exception {
        final NettyUdsHttpClient nettyUdsHttpClient = new NettyUdsHttpClient();
        nettyUdsHttpClient.request("/opt/test.sock");
    }

    private Bootstrap b = null;

    private static EventLoopGroup workerGroup = null;

    public NettyUdsHttpClient() {
        EventLoopGroup workerGroup = null;
        Class domainSocketChannelClazz = null;
        if (Epoll.isAvailable()) {
            domainSocketChannelClazz = EpollDomainSocketChannel.class;
            System.out.println("Epoll.isAvailable");
            workerGroup = new EpollEventLoopGroup(1);
        } else if (KQueue.isAvailable()) {
            System.out.println("KQueue.isAvailable");
            workerGroup = new KQueueEventLoopGroup(1);
            domainSocketChannelClazz = KQueueDomainSocketChannel.class;
        } else {
            System.out.println("use NioEventLoopGroup");
            workerGroup = new NioEventLoopGroup(1);
            domainSocketChannelClazz = NioSocketChannel.class;
        }

        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(domainSocketChannelClazz);
        b.option(ChannelOption.SO_KEEPALIVE, false);
        b.handler(new ChannelInitializer<DomainSocketChannel>() {
            @Override
            public void initChannel(DomainSocketChannel ch) throws Exception {
                // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                ch.pipeline().addLast(new HttpResponseDecoder());

                // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                ch.pipeline().addLast(new HttpRequestEncoder());
                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {

                    private Map<String, String> headerMap = new HashMap<>();

                    private int statusCode;

                    private StringBuilder contentStr = new StringBuilder();

                    private int currentSequenceId;

                    @Override
                    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
                        if (msg instanceof HttpResponse) {
                            DefaultHttpResponse response = (DefaultHttpResponse) msg;
                            this.statusCode = response.status().code();
                            HttpHeaders headers = response.headers();
                            Integer sequenceId = headers.getInt("Sequenceid");
                            if (sequenceId != null) {
                                this.currentSequenceId = sequenceId;
                            }
                            Iterator<Map.Entry<String, String>> headerIterator = headers.iteratorAsString();
                            // 封装header
                            while (headerIterator.hasNext()) {
                                Map.Entry<String, String> header = headerIterator.next();
                                headerMap.put(header.getKey(), header.getValue());
                            }
                        }
                        // 和mesh交互,没有Trailer,因此不考虑
                        if (msg instanceof HttpContent) {
                            HttpContent content = (HttpContent) msg;
                            contentStr.append(content.content().toString(StandardCharsets.UTF_8));
                            if (msg instanceof LastHttpContent) {
                                // http 响应已经读完
                                System.out.println("currentSequenceId = " + currentSequenceId + "响应码 = " + statusCode + " headerMap = " + headerMap + " content = " + contentStr.toString());
                                ctx.channel().close();
                            }
                        }
                    }
                });
            }
        });
        this.b = b;
        this.workerGroup = workerGroup;
    }

    public void request(String path) throws Exception {
        try {
            // Start the client.
            ChannelFuture f = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            // get 请求
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                    "/get?key=123", Unpooled.EMPTY_BUFFER);

            request.headers().set("Sequenceid", 1);
            request.headers().set(HttpHeaderNames.HOST, "daemon");
            // 发送http请求
            f.channel().writeAndFlush(request);

            // post 请求
            ChannelFuture f1 = b.connect(new DomainSocketAddress(path)).syncUninterruptibly();

            String msg = "hello";
            ByteBuf byteBuf = Unpooled.wrappedBuffer(msg.getBytes("UTF-8"));

            DefaultFullHttpRequest request1 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/post", byteBuf);
            request1.headers().set(HttpHeaderNames.HOST, "daemon");
            request1.headers().set("Sequenceid", 2);
            request1.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
            request1.headers().set(HttpHeaderNames.CONTENT_LENGTH, request1.content().readableBytes());
            f1.channel().writeAndFlush(request1);
            System.out.println("over ");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

  其实在工作中很容易遇到一些比较痛苦的事情,这时候如何考虑优化掉这个事情就很重要了,等解决一个非常棘手的事情就会有成就感。感谢您的阅读,如果感觉我写的还行,求关注~

相关文章

网友评论

      本文标题:进程重启端口号冲突了,太坑了吧

      本文链接:https://www.haomeiwen.com/subject/qbpkwhtx.html