美文网首页
dubbo系列之-qos运维-2021-01-17

dubbo系列之-qos运维-2021-01-17

作者: five_year | 来源:发表于2021-01-17 17:04 被阅读0次

背景

dubbo自带的运维工具dubbo-admin,主要面向开发人员去管理服务,携带很多管理、控制等功能,然后在dubbo新版本又推出了qos(Quality of Service),主要面向运维管理。我在之前公司有用到次功能,在和k8s结合时,通过http发送主动下线功能(下线注册,但不下线服务),等到流量完全停止,在下线pod,实现平滑发布。

疑问

怎么样去管理?

分析

dubbo通过 QosProtocolWrapper 这个包装器实现qos发布,QosProtocolWrapper 是Protocol 三大包装器(filter,listener,qos)其中之一,默认会开启qos功能,可以配置关闭

<dubbo:application name="dubbo-consumer">
<dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>

qos 主要提供了ls,online,offline,help 功能,具体说,只有三种,上下线服务和查看服务

命令 作用
ls 能够列出来该实例服务提供者与调用者状态
online 服务上线,可以指定某个接口,也可以什么也不指定,这样就是全部
offline 服务下线,可以指定某个接口,也可以什么也不指定,这样就是全部
help 查看命令的用途,不带参数显示全部命令,带参数只显示指定的

实现

我们跟读一下源码,看看qos 服务的启动,请求处理,上下线等。

//org.apache.dubbo.qos.protocol.QosProtocolWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        startQosServer(invoker.getUrl());
        return protocol.export(invoker);
    }
    return protocol.export(invoker);
}
//org.apache.dubbo.qos.protocol.QosProtocolWrapper#refer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        startQosServer(url);
        return protocol.refer(type, url);
    }
    return protocol.refer(type, url);
}
private static AtomicBoolean hasStarted = new AtomicBoolean(false);
//org.apache.dubbo.qos.protocol.QosProtocolWrapper#startQosServer
private void startQosServer(URL url) {
    if (!hasStarted.compareAndSet(false, true)) {//cas保证启动一次
        return;
    }
    boolean qosEnable = url.getParameter(QOS_ENABLE, true);
    if (!qosEnable) {  return;  }

    int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
    boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
    Server server = Server.getInstance();
    server.setPort(port);
    server.setAcceptForeignIp(acceptForeignIp);
    server.start();
}

在dubbo 生产者服务暴露和消费者消费引用的过程中都会启动qos,并且qos 通过cas来保证一个jvm只启动一次。

public void start() throws Throwable {
    if (!started.compareAndSet(false, true)) {return;}
    boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
    worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(boss, worker);
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
    serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
    serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
        }
    });
    serverBootstrap.bind(port).sync();
}

同样qos 功能也是通过netty启动server,处理类指定为 QosProcessHandler,这个handler实现了netty的 ByteToMessageDecoder 可以将网络流中的字节解码为对象

public static final String dubbo =
                "   ___   __  __ ___   ___   ____     " + System.lineSeparator() +
                "  / _ \\ / / / // _ ) / _ ) / __ \\  " + System.lineSeparator() +
                " / // // /_/ // _  |/ _  |/ /_/ /    " + System.lineSeparator() +
                "/____/ \\____//____//____/ \\____/   " + System.lineSeparator();

public class QosProcessHandler extends ByteToMessageDecoder {
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        welcomeFuture = ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                if (welcome != null) {
                    ctx.write(Unpooled.wrappedBuffer(welcome.getBytes()));
                    ctx.writeAndFlush(Unpooled.wrappedBuffer(prompt.getBytes()));
                }
            }

        }, 500, TimeUnit.MILLISECONDS);
    }

channelActive() 方法含有为连接建立的时候回调,这里有个定时任务500ms,会刷一个美体的dubbo给客户端,我们验证下。

image

我们看看 QosProcessHandler#decode 是怎么处理请求的。

//org.apache.dubbo.qos.server.handler.QosProcessHandler#decode
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 1) {
        return;
    }
    final int magic = in.getByte(in.readerIndex());
    ChannelPipeline p = ctx.pipeline();
    p.addLast(new LocalHostPermitHandler(acceptForeignIp));
    if (isHttp(magic)) {
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpProcessHandler());
        p.remove(this);
    } else {
        p.addLast(new LineBasedFrameDecoder(2048));
        p.addLast(new IdleStateHandler(0, 0, 5 * 60));
        p.addLast(new TelnetProcessHandler());
        p.remove(this);
    }
}

private static boolean isHttp(int magic) {
    return magic == 'G' || magic == 'P';
}

上面方法讲究,先读取第一个字节判断请求是http 还是 tcp,为什么用第一个字节呢,我们知道http信息头开始为 GET /xx 或者 POST /xx,第一个字符要么G要么P,判断为http 则用http编解码,如果为tcp 则用 LineBasedFrameDecoder 编解码,这是一个换行分割读取的解码方式遇到(\n,\r)[就是telnet时候的回车]时,就截断,如果为tcp 还会添加一个 IdleStateHandler 作为心跳检测,最后处理指令的handler 为 TelnetProcessHandler。

先演示下效果

image image

为了便于观察我们这里看http的处理指令的方式 HttpProcessHandler。

//org.apache.dubbo.qos.server.handler.HttpProcessHandler#channelRead0
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
    CommandContext commandContext = HttpCommandDecoder.decode(msg);
    // return 404 when fail to construct command context
    commandContext.setRemote(ctx.channel());
    try {
        String result = commandExecutor.execute(commandContext);
        FullHttpResponse response = http200(result);
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    } 
}
//org.apache.dubbo.qos.command.DefaultCommandExecutor#execute
public String execute(CommandContext commandContext) throws NoSuchCommandException {
    BaseCommand command = null;
    command = ExtensionLoader.getExtensionLoader(BaseCommand.class).getExtension(commandContext.getCommandName());
    if (command == null) {
        throw new NoSuchCommandException(commandContext.getCommandName());
    }
    return command.execute(commandContext, commandContext.getArgs());
}

HttpCommandDecoder.decode(msg)会将get或者post请求携带的路径等返回给 commandContext, BaseCommand.class 为指令扩展点会根据uri 传入的指令,来指定要处理的类,优点类似策略模式。我们看看offline 是怎么处理的

image
//org.apache.dubbo.qos.command.impl.Offline#execute
public String execute(CommandContext commandContext, String[] args) {
    String servicePattern = ".*";
    if (args != null && args.length > 0) {
        servicePattern = args[0];
    }
    boolean hasService = false;

    Collection<ProviderModel> providerModelList = ApplicationModel.allProviderModels();
    for (ProviderModel providerModel : providerModelList) {
        if (providerModel.getServiceName().matches(servicePattern)) {
            hasService = true;
            Set<ProviderInvokerWrapper> providerInvokerWrapperSet = ProviderConsumerRegTable.getProviderInvoker(providerModel.getServiceName());
            for (ProviderInvokerWrapper providerInvokerWrapper : providerInvokerWrapperSet) {
                if (!providerInvokerWrapper.isReg()) {
                    continue;
                }
                Registry registry = registryFactory.getRegistry(providerInvokerWrapper.getRegistryUrl());
                registry.unregister(providerInvokerWrapper.getProviderUrl());
                providerInvokerWrapper.setReg(false);
            }
        }
    }
    if (hasService) {
        return "OK";
    } else {
        return "service not found";
    }
}
//org.apache.dubbo.registry.support.FailbackRegistry#unregister
public void unregister(URL url) {
    super.unregister(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a cancellation request to the server side
        doUnregister(url);
 }

//org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
public void doUnregister(URL url) {
   zkClient.delete(toUrlPath(url));
}

可以传入服务,默认所有服务,18行中从注册工厂中获取服务对应的注册中心,然后调用注册中心的unregister() 最后层层调用到zk客户端的delete()方法来,删除zk临时节点。

总结

qos 的功能和简单,之所以单独拿出来讲是因为这里涵盖了我们web开发中常提到的“http服务器”概念,通过netty 启动服务,然后处理请求。

相关文章

网友评论

      本文标题:dubbo系列之-qos运维-2021-01-17

      本文链接:https://www.haomeiwen.com/subject/sqtzaktx.html