美文网首页
dapeng 调用

dapeng 调用

作者: 偶像本人 | 来源:发表于2018-02-06 11:49 被阅读0次

    dapeng调用过程

    标签(空格分隔): dapeng


    1、服务端启动(主要介绍几个插件spring、zookeeper、netty,服务端只要启动这三个,就能完成与客户端的通信)容器加载完各个classloader之后开始启动各个插件

    • SpringPlugin:拿到service.xml,通过spring的ClassPathXmlApplicationContext加载到相应processor(service的具体信息,包括接口,接口实现,对应的方法,同步异步)这个解析的过程详细可以看dapeng项目的dapeng-spring模块。加载完后注册到容器中。
    @Override
       @SuppressWarnings("unchecked")
       public SoaServiceDefinition<?> getObject() throws Exception {
           final Class<?> aClass = serviceRef.getClass();
           final List<Class<?>> interfaces = Arrays.asList(aClass.getInterfaces());
    
           List<Class<?>> filterInterfaces = interfaces.stream()
                   .filter(anInterface -> anInterface.isAnnotationPresent(Service.class) && anInterface.isAnnotationPresent(Processor.class))
                   .collect(toList());
    
           if (filterInterfaces.isEmpty()) {
               throw new RuntimeException("not config @Service & @Processor in " + refId);
           }
    
           Class<?> interfaceClass = filterInterfaces.get(filterInterfaces.size() - 1);
    
           Processor processor = interfaceClass.getAnnotation(Processor.class);
    
           Class<?> processorClass = Class.forName(processor.className(), true, interfaceClass.getClassLoader());
           Constructor<?> constructor = processorClass.getConstructor(interfaceClass,Class.class);
           SoaServiceDefinition tProcessor = (SoaServiceDefinition) constructor.newInstance(serviceRef,interfaceClass);
    
           return tProcessor;
       }
    
    • zookeeper:
      连接zookeeper,zookeeper从容器获取服务信息,注册服务(/soa/runtime/services/com.isuwang.soa.order.service.EnquiryPriceService data:"10.117.17.97:9084:1.0.0")
      @Override
        public void registerService(String serverName, String versionName) {
            try {
                //注册服务信息到runtime节点
                String path = "/soa/runtime/services/" + serverName + "/" + SoaSystemEnvProperties.SOA_CONTAINER_IP + ":" + SoaSystemEnvProperties.SOA_CONTAINER_PORT + ":" + versionName;
                String data = "";
                zooKeeperHelper.addOrUpdateServerInfo(path, data);
    
                //注册服务信息到master节点,并进行master选举
                // TODO 后续需要优化选举机制
                if (SoaSystemEnvProperties.SOA_ZOOKEEPER_MASTER_ISCONFIG) {
                    zooKeeperMasterHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
                }
                else {
                    zooKeeperHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    
    • nettyPlugin:开放的端口和zookeeper注册的端口一致,主要的处理在pipeline()
        @Override
        public void start() {
            LOGGER.warn("Plugin::NettyPlugin start");
            LOGGER.info("Bind Local Port {} [Netty]", port);
    
            new Thread("NettyContainer-Thread") {
                @Override
                public void run() {
                    try {
                        bootstrap = new ServerBootstrap();
    
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel ch) throws Exception {
                                        ch.pipeline().addLast(new IdleStateHandler(15, 0, 0), //超时设置
                                                new SoaDecoder(), //粘包和断包处理
                                                new SoaIdleHandler(),  //心跳处理
                                                new SoaServerHandler(container));  //调用处理
                                    }
                                })
                                .option(ChannelOption.SO_BACKLOG, 1024)
                                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//重复利用之前分配的内存空间(PooledByteBuf -> ByteBuf)
                                .childOption(ChannelOption.SO_KEEPALIVE, true)
                                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
                        // Start the server.
                        ChannelFuture f = bootstrap.bind(port).sync();
    
                        // Wait until the connection is closed.
                        f.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        LOGGER.error(e.getMessage(), e);
                    } finally {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    }
                }
            }.start();
        }
    

    2、生成代码的结构(生成domain,enums,service,同步异步的client,codec,xml文件)

    • domain:定义的实体类
    • enmus:定义的枚举
    • client:
    public class AdminCacheServiceClient implements AdminCacheService {
       private final String serviceName;
       private final String version;
    
       private SoaConnectionPool pool;
    
       public AdminCacheServiceClient() {
           this.serviceName = "com.isuwang.soa.admin.service.AdminCacheService";
           this.version = "1.0.0";
    
           ServiceLoader<SoaConnectionPoolFactory> factories = ServiceLoader.load(SoaConnectionPoolFactory.class);
           for (SoaConnectionPoolFactory factory : factories) {
               this.pool = factory.getPool();
               break;
           }
           this.pool.registerClientInfo(serviceName, version);
       }
       
       public java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws SoaException {
    
           String methodName = "findStaffNamesByIds";
    
           findStaffNamesByIds_args findStaffNamesByIds_args = new findStaffNamesByIds_args();
           findStaffNamesByIds_args.setStaffIds(staffIds);
    
    
           findStaffNamesByIds_result response = pool.send(serviceName, version, "findStaffNamesByIds", findStaffNamesByIds_args, new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());
    
    
           return response.getSuccess();
    
    
       }
    
    • service:定义的接口
    @Service(name = "com.isuwang.soa.admin.service.AdminCacheService", version = "1.0.0")
    @Processor(className = "com.isuwang.soa.admin.AdminCacheServiceCodec$Processor")
    public interface AdminCacheService {
    
       java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws com.github.dapeng.core.SoaException;
       
       String findStaffName(Integer staffId) throws com.github.dapeng.core.SoaException;
       
       com.isuwang.soa.admin.domain.TStaffCache getEntity(Integer staffId) throws com.github.dapeng.core.SoaException;
       
       java.util.Set<Integer> getDescendantStaffs(Integer orgId) throws com.github.dapeng.core.SoaException;
    }
    
    • codec:包含了参数对象,结果对象,对应的序列化器,processors
     public static class findStaffNamesByIds<I extends com.isuwang.soa.admin.service.AdminCacheService> extends SoaFunctionDefinition.Sync<I, findStaffNamesByIds_args, findStaffNamesByIds_result> {
            public findStaffNamesByIds() {
                super("findStaffNamesByIds", new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());
            }
    
            @Override
            public findStaffNamesByIds_result apply(I iface, findStaffNamesByIds_args findStaffNamesByIds_args) throws SoaException {
    
                findStaffNamesByIds_result result = new findStaffNamesByIds_result();
                result.success = iface.findStaffNamesByIds(findStaffNamesByIds_args.staffIds);
                return result;
            }
    
        }
    

    3、客户端发送请求(调用service服务的话必须依赖 dapeng-netty-client 旧版本的是dapeng-remoting-client)

    request -> 建立connection -> 将请求序列化成字节流 -> 发送(channel.writeAndFlush(request))

    • 建立connection:client拿到SoaConnectionPool,连接zookeeper,根据服务名和版本号获取对应的连接信息(ip和端口,可能有多个),通过负载均衡策略选择一个连接(策略:随机,轮循,最少活跃调用数,一致性Hash)
    • 处理请求(经过filter处理)
    • 序列化()
    4 1 1 1 4 n n 1
    length stx(0x02) version(1) codec protocol(1) seq(1) soaHeader request etx(3)
       public ByteBuf build() throws TException {
            InvocationContext invocationCtx = InvocationContextImpl.Factory.getCurrentInstance();
            //buildHeader
            protocol = protocol == null ? (invocationCtx.getCodecProtocol() == null ? CodecProtocol.CompressedBinary
                    : invocationCtx.getCodecProtocol()) : protocol;
            TSoaTransport transport = new TSoaTransport(buffer);
            TBinaryProtocol headerProtocol = new TBinaryProtocol(transport);
            headerProtocol.writeByte(STX);
            headerProtocol.writeByte(VERSION);
            headerProtocol.writeByte(protocol.getCode());
            headerProtocol.writeI32(seqid);
            new SoaHeaderSerializer().write(header, headerProtocol);
    
            //writer body
            TProtocol bodyProtocol = null;
            switch (protocol) {
                case Binary:
                    bodyProtocol = new TBinaryProtocol(transport);
                    break;
                case CompressedBinary:
                    bodyProtocol = new TCompactProtocol(transport);
                    break;
                case Json:
                    bodyProtocol = new TJSONProtocol(transport);
                    break;
                default:
                    throw new TException("通讯协议不正确(包体协议)");
            }
            bodySerializer.write(body, bodyProtocol);
    
            headerProtocol.writeByte(ETX);
            transport.flush();
    
            return this.buffer;
        }
    
    • 发送请求
    • 等待服务端处理请求
    • 处理返回结果(反序列化返回的字节流)
    @Override
                public void onEntry(FilterContext ctx, FilterChain next) throws SoaException {
    
                    ByteBuf requestBuf = buildRequestBuf(service, version, method, seqid, request, requestSerializer);
    
                    // TODO filter
                    checkChannel();
                    ByteBuf responseBuf = client.send(channel, seqid, requestBuf); //发送请求,返回结果
    
                    Result<RESP> result = processResponse(responseBuf, responseSerializer);
                    ctx.setAttribute("result", result);
    
                    onExit(ctx, getPrevChain(ctx));
                }
    

    4、服务端的处理

    • SoaServerHandler channelRead(ChannelHandlerContext ctx, Object msg)

    msg为收到的请求的字节流,服务端反序列化msg,拿到对应的soaheader,再从容器拿到注册的服务信息,调用对应的方法,得到结果。

        private <I, REQ, RESP> void processRequest(ChannelHandlerContext channelHandlerContext, TProtocol contentProtocol, SoaServiceDefinition<I> serviceDef,
                                                   ByteBuf reqMessage, TransactionContext context) throws TException {
    
            try {
                SoaHeader soaHeader = context.getHeader();
                Application application = container.getApplication(new ProcessorKey(soaHeader.getServiceName(), soaHeader.getVersionName()));
    
                SoaFunctionDefinition<I, REQ, RESP> soaFunction = (SoaFunctionDefinition<I, REQ, RESP>) serviceDef.functions.get(soaHeader.getMethodName());
                REQ args = soaFunction.reqSerializer.read(contentProtocol);
                contentProtocol.readMessageEnd();
                //
                I iface = serviceDef.iface;
                //log request
                application.info(this.getClass(), "{} {} {} operatorId:{} operatorName:{} request body:{}", soaHeader.getServiceName(), soaHeader.getVersionName(), soaHeader.getMethodName(), soaHeader.getOperatorId(), soaHeader.getOperatorName(), formatToString(soaFunction.reqSerializer.toString(args)));
    
                HeadFilter headFilter = new HeadFilter();
                Filter dispatchFilter = new Filter() {
    
                    private FilterChain getPrevChain(FilterContext ctx) {
                        SharedChain chain = (SharedChain) ctx.getAttach(this, "chain");
                        return new SharedChain(chain.head, chain.shared, chain.tail, chain.size() - 2);
                    }
    
                    @Override
                    public void onEntry(FilterContext ctx, FilterChain next) {
                        try {
                            if (serviceDef.isAsync) {
                                SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
                                CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
                                future.whenComplete((realResult, ex) -> {
                                    TransactionContext.Factory.setCurrentInstance(context);
                                    processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
                                    onExit(ctx, getPrevChain(ctx));
                                });
                            } else {
                                SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
                                RESP result = (RESP) syncFunction.apply(iface, args);
                                processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
                                onExit(ctx, getPrevChain(ctx));
                            }
                        } catch (Exception e) {
                            LOGGER.error(e.getMessage(), e);
                            writeErrorMessage(channelHandlerContext, context, new SoaException(SoaCode.UnKnown, e.getMessage()));
                        }
                    }
    
                    @Override
                    public void onExit(FilterContext ctx, FilterChain prev) {
                        try {
                            prev.onExit(ctx);
                        } catch (TException e) {
                            LOGGER.error(e.getMessage(), e);
                        }
                    }
                };
                SharedChain sharedChain = new SharedChain(headFilter, container.getFilters(), dispatchFilter, 0);
    
                FilterContextImpl filterContext = new FilterContextImpl();
                filterContext.setAttach(dispatchFilter, "chain", sharedChain);
    
                sharedChain.onEntry(filterContext);
            } finally {
                reqMessage.release();
            }
        }
    

    相关文章

      网友评论

          本文标题:dapeng 调用

          本文链接:https://www.haomeiwen.com/subject/xydwzxtx.html