背景
dubbo自带的运维工具dubbo-admin,主要面向开发人员去管理服务,携带很多管理、控制等功能,然后在dubbo新版本又推出了qos(Quality of Service),主要面向运维管理。我在之前公司有用到次功能,在和k8s结合时,通过http发送主动下线功能(下线注册,但不下线服务),等到流量完全停止,在下线pod,实现平滑发布。
疑问
怎么样去管理?
分析
dubbo通过 QosProtocolWrapper 这个包装器实现qos发布,QosProtocolWrapper 是Protocol 三大包装器(filter,listener,qos)其中之一,默认会开启qos功能,可以配置关闭
<dubbo:application name="dubbo-consumer">
<dubbo:parameter key="qos.enable" value="false"/>
</dubbo:application>
qos 主要提供了ls,online,offline,help 功能,具体说,只有三种,上下线服务和查看服务
命令 | 作用 |
---|---|
ls | 能够列出来该实例服务提供者与调用者状态 |
online | 服务上线,可以指定某个接口,也可以什么也不指定,这样就是全部 |
offline | 服务下线,可以指定某个接口,也可以什么也不指定,这样就是全部 |
help | 查看命令的用途,不带参数显示全部命令,带参数只显示指定的 |
实现
我们跟读一下源码,看看qos 服务的启动,请求处理,上下线等。
//org.apache.dubbo.qos.protocol.QosProtocolWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
//org.apache.dubbo.qos.protocol.QosProtocolWrapper#refer
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
private static AtomicBoolean hasStarted = new AtomicBoolean(false);
//org.apache.dubbo.qos.protocol.QosProtocolWrapper#startQosServer
private void startQosServer(URL url) {
if (!hasStarted.compareAndSet(false, true)) {//cas保证启动一次
return;
}
boolean qosEnable = url.getParameter(QOS_ENABLE, true);
if (!qosEnable) { return; }
int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
Server server = Server.getInstance();
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
}
在dubbo 生产者服务暴露和消费者消费引用的过程中都会启动qos,并且qos 通过cas来保证一个jvm只启动一次。
public void start() throws Throwable {
if (!started.compareAndSet(false, true)) {return;}
boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
serverBootstrap.bind(port).sync();
}
同样qos 功能也是通过netty启动server,处理类指定为 QosProcessHandler,这个handler实现了netty的 ByteToMessageDecoder 可以将网络流中的字节解码为对象
public static final String dubbo =
" ___ __ __ ___ ___ ____ " + System.lineSeparator() +
" / _ \\ / / / // _ ) / _ ) / __ \\ " + System.lineSeparator() +
" / // // /_/ // _ |/ _ |/ /_/ / " + System.lineSeparator() +
"/____/ \\____//____//____/ \\____/ " + System.lineSeparator();
public class QosProcessHandler extends ByteToMessageDecoder {
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
welcomeFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (welcome != null) {
ctx.write(Unpooled.wrappedBuffer(welcome.getBytes()));
ctx.writeAndFlush(Unpooled.wrappedBuffer(prompt.getBytes()));
}
}
}, 500, TimeUnit.MILLISECONDS);
}
channelActive() 方法含有为连接建立的时候回调,这里有个定时任务500ms,会刷一个美体的dubbo给客户端,我们验证下。
我们看看 QosProcessHandler#decode 是怎么处理请求的。
//org.apache.dubbo.qos.server.handler.QosProcessHandler#decode
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 1) {
return;
}
final int magic = in.getByte(in.readerIndex());
ChannelPipeline p = ctx.pipeline();
p.addLast(new LocalHostPermitHandler(acceptForeignIp));
if (isHttp(magic)) {
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpProcessHandler());
p.remove(this);
} else {
p.addLast(new LineBasedFrameDecoder(2048));
p.addLast(new IdleStateHandler(0, 0, 5 * 60));
p.addLast(new TelnetProcessHandler());
p.remove(this);
}
}
private static boolean isHttp(int magic) {
return magic == 'G' || magic == 'P';
}
上面方法讲究,先读取第一个字节判断请求是http 还是 tcp,为什么用第一个字节呢,我们知道http信息头开始为 GET /xx 或者 POST /xx,第一个字符要么G要么P,判断为http 则用http编解码,如果为tcp 则用 LineBasedFrameDecoder 编解码,这是一个换行分割读取的解码方式遇到(\n,\r)[就是telnet时候的回车]时,就截断,如果为tcp 还会添加一个 IdleStateHandler 作为心跳检测,最后处理指令的handler 为 TelnetProcessHandler。
先演示下效果
为了便于观察我们这里看http的处理指令的方式 HttpProcessHandler。
//org.apache.dubbo.qos.server.handler.HttpProcessHandler#channelRead0
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
CommandContext commandContext = HttpCommandDecoder.decode(msg);
// return 404 when fail to construct command context
commandContext.setRemote(ctx.channel());
try {
String result = commandExecutor.execute(commandContext);
FullHttpResponse response = http200(result);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
//org.apache.dubbo.qos.command.DefaultCommandExecutor#execute
public String execute(CommandContext commandContext) throws NoSuchCommandException {
BaseCommand command = null;
command = ExtensionLoader.getExtensionLoader(BaseCommand.class).getExtension(commandContext.getCommandName());
if (command == null) {
throw new NoSuchCommandException(commandContext.getCommandName());
}
return command.execute(commandContext, commandContext.getArgs());
}
HttpCommandDecoder.decode(msg)会将get或者post请求携带的路径等返回给 commandContext, BaseCommand.class 为指令扩展点会根据uri 传入的指令,来指定要处理的类,优点类似策略模式。我们看看offline 是怎么处理的
//org.apache.dubbo.qos.command.impl.Offline#execute
public String execute(CommandContext commandContext, String[] args) {
String servicePattern = ".*";
if (args != null && args.length > 0) {
servicePattern = args[0];
}
boolean hasService = false;
Collection<ProviderModel> providerModelList = ApplicationModel.allProviderModels();
for (ProviderModel providerModel : providerModelList) {
if (providerModel.getServiceName().matches(servicePattern)) {
hasService = true;
Set<ProviderInvokerWrapper> providerInvokerWrapperSet = ProviderConsumerRegTable.getProviderInvoker(providerModel.getServiceName());
for (ProviderInvokerWrapper providerInvokerWrapper : providerInvokerWrapperSet) {
if (!providerInvokerWrapper.isReg()) {
continue;
}
Registry registry = registryFactory.getRegistry(providerInvokerWrapper.getRegistryUrl());
registry.unregister(providerInvokerWrapper.getProviderUrl());
providerInvokerWrapper.setReg(false);
}
}
}
if (hasService) {
return "OK";
} else {
return "service not found";
}
}
//org.apache.dubbo.registry.support.FailbackRegistry#unregister
public void unregister(URL url) {
super.unregister(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a cancellation request to the server side
doUnregister(url);
}
//org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
public void doUnregister(URL url) {
zkClient.delete(toUrlPath(url));
}
可以传入服务,默认所有服务,18行中从注册工厂中获取服务对应的注册中心,然后调用注册中心的unregister() 最后层层调用到zk客户端的delete()方法来,删除zk临时节点。
总结
qos 的功能和简单,之所以单独拿出来讲是因为这里涵盖了我们web开发中常提到的“http服务器”概念,通过netty 启动服务,然后处理请求。
网友评论