美文网首页
dapeng 调用

dapeng 调用

作者: 偶像本人 | 来源:发表于2018-02-06 11:49 被阅读0次

dapeng调用过程

标签(空格分隔): dapeng


1、服务端启动(主要介绍几个插件spring、zookeeper、netty,服务端只要启动这三个,就能完成与客户端的通信)容器加载完各个classloader之后开始启动各个插件

  • SpringPlugin:拿到service.xml,通过spring的ClassPathXmlApplicationContext加载到相应processor(service的具体信息,包括接口,接口实现,对应的方法,同步异步)这个解析的过程详细可以看dapeng项目的dapeng-spring模块。加载完后注册到容器中。
@Override
   @SuppressWarnings("unchecked")
   public SoaServiceDefinition<?> getObject() throws Exception {
       final Class<?> aClass = serviceRef.getClass();
       final List<Class<?>> interfaces = Arrays.asList(aClass.getInterfaces());

       List<Class<?>> filterInterfaces = interfaces.stream()
               .filter(anInterface -> anInterface.isAnnotationPresent(Service.class) && anInterface.isAnnotationPresent(Processor.class))
               .collect(toList());

       if (filterInterfaces.isEmpty()) {
           throw new RuntimeException("not config @Service & @Processor in " + refId);
       }

       Class<?> interfaceClass = filterInterfaces.get(filterInterfaces.size() - 1);

       Processor processor = interfaceClass.getAnnotation(Processor.class);

       Class<?> processorClass = Class.forName(processor.className(), true, interfaceClass.getClassLoader());
       Constructor<?> constructor = processorClass.getConstructor(interfaceClass,Class.class);
       SoaServiceDefinition tProcessor = (SoaServiceDefinition) constructor.newInstance(serviceRef,interfaceClass);

       return tProcessor;
   }
  • zookeeper:
    连接zookeeper,zookeeper从容器获取服务信息,注册服务(/soa/runtime/services/com.isuwang.soa.order.service.EnquiryPriceService data:"10.117.17.97:9084:1.0.0")
  @Override
    public void registerService(String serverName, String versionName) {
        try {
            //注册服务信息到runtime节点
            String path = "/soa/runtime/services/" + serverName + "/" + SoaSystemEnvProperties.SOA_CONTAINER_IP + ":" + SoaSystemEnvProperties.SOA_CONTAINER_PORT + ":" + versionName;
            String data = "";
            zooKeeperHelper.addOrUpdateServerInfo(path, data);

            //注册服务信息到master节点,并进行master选举
            // TODO 后续需要优化选举机制
            if (SoaSystemEnvProperties.SOA_ZOOKEEPER_MASTER_ISCONFIG) {
                zooKeeperMasterHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
            }
            else {
                zooKeeperHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
  • nettyPlugin:开放的端口和zookeeper注册的端口一致,主要的处理在pipeline()
    @Override
    public void start() {
        LOGGER.warn("Plugin::NettyPlugin start");
        LOGGER.info("Bind Local Port {} [Netty]", port);

        new Thread("NettyContainer-Thread") {
            @Override
            public void run() {
                try {
                    bootstrap = new ServerBootstrap();

                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ch.pipeline().addLast(new IdleStateHandler(15, 0, 0), //超时设置
                                            new SoaDecoder(), //粘包和断包处理
                                            new SoaIdleHandler(),  //心跳处理
                                            new SoaServerHandler(container));  //调用处理
                                }
                            })
                            .option(ChannelOption.SO_BACKLOG, 1024)
                            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//重复利用之前分配的内存空间(PooledByteBuf -> ByteBuf)
                            .childOption(ChannelOption.SO_KEEPALIVE, true)
                            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

                    // Start the server.
                    ChannelFuture f = bootstrap.bind(port).sync();

                    // Wait until the connection is closed.
                    f.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                } finally {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                }
            }
        }.start();
    }

2、生成代码的结构(生成domain,enums,service,同步异步的client,codec,xml文件)

  • domain:定义的实体类
  • enmus:定义的枚举
  • client:
public class AdminCacheServiceClient implements AdminCacheService {
   private final String serviceName;
   private final String version;

   private SoaConnectionPool pool;

   public AdminCacheServiceClient() {
       this.serviceName = "com.isuwang.soa.admin.service.AdminCacheService";
       this.version = "1.0.0";

       ServiceLoader<SoaConnectionPoolFactory> factories = ServiceLoader.load(SoaConnectionPoolFactory.class);
       for (SoaConnectionPoolFactory factory : factories) {
           this.pool = factory.getPool();
           break;
       }
       this.pool.registerClientInfo(serviceName, version);
   }
   
   public java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws SoaException {

       String methodName = "findStaffNamesByIds";

       findStaffNamesByIds_args findStaffNamesByIds_args = new findStaffNamesByIds_args();
       findStaffNamesByIds_args.setStaffIds(staffIds);


       findStaffNamesByIds_result response = pool.send(serviceName, version, "findStaffNamesByIds", findStaffNamesByIds_args, new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());


       return response.getSuccess();


   }
  • service:定义的接口
@Service(name = "com.isuwang.soa.admin.service.AdminCacheService", version = "1.0.0")
@Processor(className = "com.isuwang.soa.admin.AdminCacheServiceCodec$Processor")
public interface AdminCacheService {

   java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws com.github.dapeng.core.SoaException;
   
   String findStaffName(Integer staffId) throws com.github.dapeng.core.SoaException;
   
   com.isuwang.soa.admin.domain.TStaffCache getEntity(Integer staffId) throws com.github.dapeng.core.SoaException;
   
   java.util.Set<Integer> getDescendantStaffs(Integer orgId) throws com.github.dapeng.core.SoaException;
}
  • codec:包含了参数对象,结果对象,对应的序列化器,processors
 public static class findStaffNamesByIds<I extends com.isuwang.soa.admin.service.AdminCacheService> extends SoaFunctionDefinition.Sync<I, findStaffNamesByIds_args, findStaffNamesByIds_result> {
        public findStaffNamesByIds() {
            super("findStaffNamesByIds", new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());
        }

        @Override
        public findStaffNamesByIds_result apply(I iface, findStaffNamesByIds_args findStaffNamesByIds_args) throws SoaException {

            findStaffNamesByIds_result result = new findStaffNamesByIds_result();
            result.success = iface.findStaffNamesByIds(findStaffNamesByIds_args.staffIds);
            return result;
        }

    }

3、客户端发送请求(调用service服务的话必须依赖 dapeng-netty-client 旧版本的是dapeng-remoting-client)

request -> 建立connection -> 将请求序列化成字节流 -> 发送(channel.writeAndFlush(request))

  • 建立connection:client拿到SoaConnectionPool,连接zookeeper,根据服务名和版本号获取对应的连接信息(ip和端口,可能有多个),通过负载均衡策略选择一个连接(策略:随机,轮循,最少活跃调用数,一致性Hash)
  • 处理请求(经过filter处理)
  • 序列化()
4 1 1 1 4 n n 1
length stx(0x02) version(1) codec protocol(1) seq(1) soaHeader request etx(3)
   public ByteBuf build() throws TException {
        InvocationContext invocationCtx = InvocationContextImpl.Factory.getCurrentInstance();
        //buildHeader
        protocol = protocol == null ? (invocationCtx.getCodecProtocol() == null ? CodecProtocol.CompressedBinary
                : invocationCtx.getCodecProtocol()) : protocol;
        TSoaTransport transport = new TSoaTransport(buffer);
        TBinaryProtocol headerProtocol = new TBinaryProtocol(transport);
        headerProtocol.writeByte(STX);
        headerProtocol.writeByte(VERSION);
        headerProtocol.writeByte(protocol.getCode());
        headerProtocol.writeI32(seqid);
        new SoaHeaderSerializer().write(header, headerProtocol);

        //writer body
        TProtocol bodyProtocol = null;
        switch (protocol) {
            case Binary:
                bodyProtocol = new TBinaryProtocol(transport);
                break;
            case CompressedBinary:
                bodyProtocol = new TCompactProtocol(transport);
                break;
            case Json:
                bodyProtocol = new TJSONProtocol(transport);
                break;
            default:
                throw new TException("通讯协议不正确(包体协议)");
        }
        bodySerializer.write(body, bodyProtocol);

        headerProtocol.writeByte(ETX);
        transport.flush();

        return this.buffer;
    }
  • 发送请求
  • 等待服务端处理请求
  • 处理返回结果(反序列化返回的字节流)
@Override
            public void onEntry(FilterContext ctx, FilterChain next) throws SoaException {

                ByteBuf requestBuf = buildRequestBuf(service, version, method, seqid, request, requestSerializer);

                // TODO filter
                checkChannel();
                ByteBuf responseBuf = client.send(channel, seqid, requestBuf); //发送请求,返回结果

                Result<RESP> result = processResponse(responseBuf, responseSerializer);
                ctx.setAttribute("result", result);

                onExit(ctx, getPrevChain(ctx));
            }

4、服务端的处理

  • SoaServerHandler channelRead(ChannelHandlerContext ctx, Object msg)

msg为收到的请求的字节流,服务端反序列化msg,拿到对应的soaheader,再从容器拿到注册的服务信息,调用对应的方法,得到结果。

    private <I, REQ, RESP> void processRequest(ChannelHandlerContext channelHandlerContext, TProtocol contentProtocol, SoaServiceDefinition<I> serviceDef,
                                               ByteBuf reqMessage, TransactionContext context) throws TException {

        try {
            SoaHeader soaHeader = context.getHeader();
            Application application = container.getApplication(new ProcessorKey(soaHeader.getServiceName(), soaHeader.getVersionName()));

            SoaFunctionDefinition<I, REQ, RESP> soaFunction = (SoaFunctionDefinition<I, REQ, RESP>) serviceDef.functions.get(soaHeader.getMethodName());
            REQ args = soaFunction.reqSerializer.read(contentProtocol);
            contentProtocol.readMessageEnd();
            //
            I iface = serviceDef.iface;
            //log request
            application.info(this.getClass(), "{} {} {} operatorId:{} operatorName:{} request body:{}", soaHeader.getServiceName(), soaHeader.getVersionName(), soaHeader.getMethodName(), soaHeader.getOperatorId(), soaHeader.getOperatorName(), formatToString(soaFunction.reqSerializer.toString(args)));

            HeadFilter headFilter = new HeadFilter();
            Filter dispatchFilter = new Filter() {

                private FilterChain getPrevChain(FilterContext ctx) {
                    SharedChain chain = (SharedChain) ctx.getAttach(this, "chain");
                    return new SharedChain(chain.head, chain.shared, chain.tail, chain.size() - 2);
                }

                @Override
                public void onEntry(FilterContext ctx, FilterChain next) {
                    try {
                        if (serviceDef.isAsync) {
                            SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
                            CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
                            future.whenComplete((realResult, ex) -> {
                                TransactionContext.Factory.setCurrentInstance(context);
                                processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
                                onExit(ctx, getPrevChain(ctx));
                            });
                        } else {
                            SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
                            RESP result = (RESP) syncFunction.apply(iface, args);
                            processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
                            onExit(ctx, getPrevChain(ctx));
                        }
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage(), e);
                        writeErrorMessage(channelHandlerContext, context, new SoaException(SoaCode.UnKnown, e.getMessage()));
                    }
                }

                @Override
                public void onExit(FilterContext ctx, FilterChain prev) {
                    try {
                        prev.onExit(ctx);
                    } catch (TException e) {
                        LOGGER.error(e.getMessage(), e);
                    }
                }
            };
            SharedChain sharedChain = new SharedChain(headFilter, container.getFilters(), dispatchFilter, 0);

            FilterContextImpl filterContext = new FilterContextImpl();
            filterContext.setAttach(dispatchFilter, "chain", sharedChain);

            sharedChain.onEntry(filterContext);
        } finally {
            reqMessage.release();
        }
    }

相关文章

网友评论

      本文标题:dapeng 调用

      本文链接:https://www.haomeiwen.com/subject/xydwzxtx.html