美文网首页
dubbo源码(七)-spring、zookeeper、nett

dubbo源码(七)-spring、zookeeper、nett

作者: KissGoodby | 来源:发表于2019-02-17 22:54 被阅读0次

    背景

    这里我们简单学习一些spring、zookeeper、netty的知识点,因为在后续的dubbo源码中会涉及到这些。如果我们不知道来龙去脉,在理解dubbo源码上有点困难。

    1. spring自定义XML标签及其解析

    在基于spring开发中,我们很常见使用xml配置一些属性,例如dubbo的配置:

    <dubbo:application name="provider" owner="hui.wang" organization="dubbo-learn"/>
    

    大家可否想过怎么自定义这种配置?怎么使用利用spring读取到自定义的配置?

    现在我们开始基于spring自定义标签及其解析。

    1. 编写一个模型类,模型的属性是配置文件对应的属性。这里我简单的编写一个模型类,取名为MyXml,代码如下:
    /**
     * @author hui.wang09
     * @since 17 February 2019
     */
    public class MyXml {
    
        private String name;
        private String value;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getValue() {
            return value;
        }
    
        public void setValue(String value) {
            this.value = value;
        }
    }
    
    1. 定义xsd文件,这里取名为myXml.xsd,代码如下:
    <xsd:schema
            xmlns="http://hui.wang.com/schema"
            xmlns:xsd="http://www.w3.org/2001/XMLSchema"
            targetNamespace="http://hui.wang.com/schema">
    
        <xsd:complexType name="elementname">
            <xsd:attribute name="name" type="xsd:string">
                <xsd:annotation>
                    <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
            <xsd:attribute name="value" type="xsd:string">
                <xsd:annotation>
                    <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
        </xsd:complexType>
    
        <xsd:element name="elementname" type="elementname">
            <xsd:annotation>
                <xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
            </xsd:annotation>
        </xsd:element>
    </xsd:schema>
    
    • 这里的targetNamespacexmlns要一致
    • xsd:element将来会在xml文件中用到的元素,例如<dubbo:application>中的application
    • xsd:attribute定义的就是模型类中的属性,例如<dubbo:application name="xxx">中的name,并且可以指定属性类型,进而起到检测的作用.
    1. 编写spring.schemas文件,该文件用来指定xsd文件的位置。我这里配置如下:
    http\://hui.wang.com/schema/myXml.xsd=META-INF/myXml.xsd
    

    这里的http\://hui.wang.com/schema/要与xsd文件中的targetNamespace一致

    1. 编写BeanDefinition解析器,主要用来解析自定义的xml标签。我这里定义一个类,取名MyXmlBeanDefinitionParser,代码如下:
    /**
     * @author hui.wang09
     * @since 17 February 2019
     */
    public class MyXmlBeanDefinitionParser implements BeanDefinitionParser{
    
        private final Class<?> beanClass;
    
        public MyXmlBeanDefinitionParser(Class<?> beanClass) {
            this.beanClass = beanClass;
        }
    
        @Override
        public BeanDefinition parse(Element element, ParserContext parserContext) {
            RootBeanDefinition beanDefinition = new RootBeanDefinition();
            beanDefinition.setBeanClass(beanClass);
            beanDefinition.setLazyInit(false);
            beanDefinition.getPropertyValues().add("name", element.getAttribute("name"));
            beanDefinition.getPropertyValues().add("value", element.getAttribute("value"));
            BeanDefinitionRegistry beanDefinitionRegistry = parserContext.getRegistry();
            beanDefinitionRegistry.registerBeanDefinition(beanClass.getName(),beanDefinition);
            return beanDefinition;
        }
    }
    
    1. 编写命名空间处理器,主要用来注册BeanDefinition解析器。我这边定义一个类,取名为MyXmlNamespaceHandler,代码如下:
    /**
     * @author hui.wang09
     * @since 17 February 2019
     */
    @Service
    public class MyXmlNamespaceHandler extends NamespaceHandlerSupport{
    
        @Override
        public void init() {
            registerBeanDefinitionParser("elementname", new MyXmlBeanDefinitionParser(MyXml.class));
        }
    }
    
    1. 编写spring.handlers文件,主要用于关联命名空间处理器和xsd中的targetNamespace。代码如下:
    http\://hui.wang.com/schema=com.hui.wang.dubbo.learn.provider.spring.xml.MyXmlNamespaceHandler
    
    1. 开始测试,开始编写xml文件,编写一个spring xml文件,内容如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:myXml="http://hui.wang.com/schema"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
           http://hui.wang.com/schema http://hui.wang.com/schema/myXml.xsd">
    
    
        <myXml:elementname value="hui.wang" name="yes"/>
        
    </beans>
    

    xmlns:myXml="http://hui.wang.com/schema"myXml.xsd文件中的targetNamespace

    编写一个测试类,代码如下:

    /**
     * @author hui.wang09
     * @since 17 February 2019
     */
    public class MyXmlTest {
    
        public static void main(String[] args) throws Exception{
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("myXml.xml");
            MyXml myXml = (MyXml) applicationContext.getBean(MyXml.class.getName());
            System.out.println(JSON.json(myXml));
        }
    }
    

    打印结果如下:

    {"name":"yes","value":"hui.wang"}
    

    通过这个例子,大家应该明白了怎么自定义xml配置和怎么解析xml的配置了。

    2. spring 对外暴露的接口

    这里我们将对InitializingBeanDisposableBeanApplicationContextAwareApplicationListenerBeanNameAwareApplicationEventPublisherAware接口含义和使用。

    这里我编写了一个类,实现了上面说的所有接口,取名SpringUtils,代码如下:

    /**
     * @author hui.wang09
     * @since 15 February 2019
     */
    @Service
    public class SpringUtils implements InitializingBean, DisposableBean, ApplicationContextAware,
            ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SpringUtils.class);
    
        /**
         * org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
         * 这个方法只是简单的返回我们当前的beanName
         */
        @Override
        public void setBeanName(String s) {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.beans.factory.BeanNameAware#setBeanName 调用,s={}", s);
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    
        /**
         * org.springframework.beans.factory.DisposableBean#destroy()
         * 在spring初始化bean的时候,如果bean实现了DisposableBean接口,会自动在销毁时调用destroy方法
         */
        @Override
        public void destroy() throws Exception {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.beans.factory.DisposableBean#destroy()");
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    
        /**
         * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
         * 在spring初始化bean的时候,如果bean实现了InitializingBean接口,会自动调用afterPropertiesSet方法。
         * spring为bean提供了两种初始化bean的方式,实现InitializingBean接口,实现afterPropertiesSet方法,
         * 或者在配置文件中同过init-method指定,两种方式可以同时使用
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.beans.factory.InitializingBean#afterPropertiesSet()");
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    
        /**
         * org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
         * 实现该接口的类,可以在spring容器初始化的时候调用setApplicationContext方法,从而获得ApplicationContext中的所有bean。
         */
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.context.ApplicationContextAware#setApplicationContext, id={}, appName={}", applicationContext.getId(), applicationContext.getApplicationName());
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    
        /**
         * org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
         * 这个方法会通知的所有与事件相匹配的监听器。这些监听可能是spring框架的监听器,也有可能是特定的监听器。
         */
        @Override
        public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher");
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    
        /**
         * org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
         * 每当ApplicationContext发布ApplicationEvent时,改方法将自动被触发
         */
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
            LOGGER.info("org.springframework.context.ApplicationListener#onApplicationEvent");
            LOGGER.info("========================================================");
            LOGGER.info("========================================================");
        }
    }
    

    然后我们启动spring容器,打印结果如下:

    22:24:47.592 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.593 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.593 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.beans.factory.BeanNameAware#setBeanName 调用,s=springUtils
    22:24:47.596 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.596 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationContextAware#setApplicationContext, id=org.springframework.web.context.WebApplicationContext:, appName=
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:48.176 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationListener#onApplicationEvent
    22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:49.040 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:49.041 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:49.043 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationListener#onApplicationEvent
    22:24:49.044 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    22:24:49.044 [RMI TCP Connection(2)-127.0.0.1] INFO  c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
    

    这里我们通过日志,可以看到这几个方法的调用先后关系和调用时机,同时也可以看到这几个方法被调用时的参数。接下来,我们对这几个接口进行一个简单的说明。

    1. InitializingBean接口,只有一个afterPropertiesSet方法,在spring初始化bean的时候,如果bean实现了InitializingBean接口,会自动调用afterPropertiesSet方法。
    2. DisposableBean接口,只有一个destroy方法,在spring初始化bean的时候,如果bean实现了DisposableBean接口,会自动在销毁时调用destroy方法。
    3. ApplicationContextAware接口,只有一个setApplicationContext方法,实现该接口的类,可以在spring容器初始化的时候调用setApplicationContext方法,从而获得ApplicationContext中的所有bean。
    4. ApplicationListener接口,只有一个接口onApplicationEvent,每当ApplicationContext发布ApplicationEvent时,改方法将自动被触发。
    5. BeanNameAware接口,只有一个接口setBeanName,这个方法只是简单的返回我们当前的beanName
    6. ApplicationEventPublisherAware接口,只有一个接口setApplicationEventPublisher,这个方法会通知的所有与事件相匹配的监听器。这些监听可能是spring框架的监听器,也有可能是特定的监听器。

    上面的几个接口的讲解就到这了。下面说一个不是spring的知识点,也是在后续dubbo源码中会碰到,就是Javassit

    3. Javassit

    说到Javassit,需要提到动态编程,动态编程是相对于静态编程而言的,平时我们讨论比较多的就是静态编程语言,例如Java,与动态编程语言,例如JavaScript。那二者有什么明显的区别呢?简单的说就是在静态编程中,类型检查是在编译时完成的,而动态编程中类型检查是在运行时完成的。所谓动态编程就是绕过编译过程在运行时进行操作的技术。

    在静态语言中引入动态特性,主要是为了解决一些使用场景的痛点。其实完全使用静态编程也办的到,只是付出的代价比较高,没有动态编程来的优雅。例如依赖注入框架Spring使用了反射。

    此处我们主要说一下通过动态生成字节码的方式。操作java字节码的工具有两个比较流行,一个是ASM,一个是Javassit 。

    • ASM :直接操作字节码指令,执行效率高,要是使用者掌握Java类字节码文件格式及指令,对使用者的要求比较高。
    • Javassit: 提供了更高级的API,执行效率相对较差,但无需掌握字节码指令的知识,对使用者要求较低。

    Javassist是一个开源的分析、编辑和创建Java字节码的类库。Javassist中最为重要的是ClassPoolCtClassCtMethod 以及 CtField这几个类。

    • ClassPool:一个基于HashMap实现的CtClass对象容器,其中键是类名称,值是表示该类的CtClass对象。默认的ClassPool使用与底层JVM相同的类路径,因此在某些情况下,可能需要向ClassPool添加类路径或类字节。
    • CtClass:表示一个类,这些CtClass对象可以从ClassPool获得。
    • CtMethods:表示类中的方法。
    • CtFields :表示类中的字段。

    下面,编写一个简单的实例,演示一下Javassist的使用:

    1. 先编写一个model类,取名为Student,代码如下:
    public class Student {
    }
    

    这里需要说明一下的是,这是一个空的class,没有任何的属性和方法。下面我们就使用Javassist,进行动态编程。

    1. Javassist的使用代码,代码如下:
    public class TestJavassitCompiler {
    
        public static void main(String[] args) throws Exception{
            ClassPool pool = ClassPool.getDefault();
            CtClass ctClass = pool.makeClass("com.hui.wang.dubbo.learn.provider.javassist.model.Student");
    
            //添加属性:private String name
            CtField nameField = new CtField(pool.getCtClass("java.lang.String"), "name", ctClass);
            nameField.setModifiers(Modifier.PRIVATE);
            ctClass.addField(nameField);
    
            //添加属性:private int age
            CtField ageField = new CtField(pool.getCtClass("int"), "age", ctClass);
            ageField.setModifiers(Modifier.PRIVATE);
            ctClass.addField(ageField);
    
            //getter和setter
            ctClass.addMethod(CtNewMethod.getter("getName", nameField));
            ctClass.addMethod(CtNewMethod.setter("setName", nameField));
            ctClass.addMethod(CtNewMethod.getter("getAge", ageField));
            ctClass.addMethod(CtNewMethod.setter("setAge", ageField));
    
            //创建构造器
            CtConstructor ctConstructor = new CtConstructor(new CtClass[] {}, ctClass);
            String body = new StringBuilder("{\nthis.age = 1;\nthis.name = \"hui.wang\";\n}").toString();
            ctConstructor.setBody(body);
            ctClass.addConstructor(ctConstructor);
    
            //普通方法
            CtMethod ctMethod = new CtMethod(CtClass.voidType, "commonMethod", new CtClass[] {}, ctClass);
            ctMethod.setModifiers(Modifier.PUBLIC);
            ctMethod.setBody(new StringBuilder("{\n System.out.println(\"this is a common method\"); \n" +
                    "\n System.out.println(this.getAge()); \n}").toString());
            ctClass.addMethod(ctMethod);
    
            Class<?> clazz = ctClass.toClass();
            Object obj = clazz.newInstance();
            //方法调用
            obj.getClass().getMethod("commonMethod", new Class[] {}).invoke(obj, new Object[] {});
        }
    }
    

    打印结果为:

    this is a common method
    1
    

    4. 关于netty3简单使用

    在dubbo中,默认是使用netty3进行通讯的,所以接下来简单介绍一下netty3怎么使用的,网络通讯分为server和client端,下来我们用netty分别创建server和client

    server端
    1. 先创建一个server类,代码如下:
    /**
     * netty3 server端
     *
     * @author hui.wang09
     * @since 22 February 2019
     */
    public class NettyServer {
    
        /**
         * 1、首先创建了NioServerSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
         * 2、创建ServerBootstrap server端启动辅助类
         * 3、为ServerBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
         * 4、使用ServerBootstrap绑定监听地址和端口
         */
        public void start() {
            ChannelFactory factory = new NioServerSocketChannelFactory(
                    // boss线程池
                    Executors.newCachedThreadPool(),
                    // worker线程池
                    Executors.newCachedThreadPool(),
                    8
            );
    
            ServerBootstrap bootstrap = new ServerBootstrap(factory);
    
            /**
             * 对于每一个连接channel, server都会调用PipelineFactory为该连接创建一个ChannelPipline
             */
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() throws Exception {
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());
                    pipeline.addLast("handler", new ServerLogicHandler());
                    return pipeline;
                }
            });
    
            Channel channel = bootstrap.bind(new InetSocketAddress("127.0.0.1", 8088));
            System.out.println("server start success");
        }
    
        public static void main(String[] args) throws InterruptedException {
            NettyServer server = new NettyServer();
            server.start();
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    

    使用netty3创建server端的步骤如下:

    • 首先创建了NioServerSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
    • 创建ServerBootstrap server端启动辅助类
    • ServerBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
    • 使用ServerBootstrap绑定监听地址和端口
    1. 设置自己的事件处理器,就是创建server的这句话pipeline.addLast("handler", new ServerLogicHandler());,其中ServerLogicHandler就是自己的事件处理器,代码如下:
    /**
     * 事件处理器
     * 监听与客户端连接成功事件
     * 监听接收到来自客户端的消息,之后写回给客户端消息
     * 捕捉异常事件
     *
     * @author hui.wang09
     * @since 22 February 2019
     */
    public class ServerLogicHandler extends SimpleChannelHandler{
    
        /**
         * 监听接收到来自客户端的消息,之后写回给客户端消息
         */
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            String msg = (String) e.getMessage();
            System.out.println("接收到了client的消息, msg: " + msg);
    
            Channel channel = e.getChannel();
            String str = "hi, client";
    
            //写消息发给client端
            channel.write(str);
            System.out.println("服务端发送数据: " + str + "完成");
        }
    
        /**
         * 捕捉异常事件
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getCause().printStackTrace();
            e.getChannel().close();
        }
    
        /**
         * 监听与客户端连接成功事件
         */
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("连接成功, channel: " + e.getChannel().toString());
        }
    }
    

    这个类的注释很多,这里不做说明,接下来就是client端了

    client端
    1. 和server端类似,先用netty3创建client端,代码如下:
    /**
     * netty3 client
     *
     * @author hui.wang09
     * @since 22 February 2019
     */
    public class NettyClient {
    
        /**
         * 1、首先创建了NioClientSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
         * 2、创建ClientBootstrap client端启动辅助类
         * 3、为ClientBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
         * 4、使用ClientBootstrap连接Server端监听的地址和端口
         */
        public static void main(String[] args) {
            ChannelFactory channelFactory = new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool(),
                    8
            );
    
            ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() throws Exception {
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());
                    pipeline.addLast("handler", new ClientLogicHandler());
                    return pipeline;
                }
            });
    
            bootstrap.connect(new InetSocketAddress("127.0.0.1", 8088));
            System.out.println("client start success");
        }
    }
    

    使用netty3创建步骤:

    • 首先创建了NioClientSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
    • 创建ClientBootstrap client端启动辅助类
    • ClientBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
    • 使用ClientBootstrap连接Server端监听的地址和端口
    1. 设置自己的事件处理器,就是创建client的这句话pipeline.addLast("handler", new ClientLogicHandler());,其中ClientLogicHandler就是自己的事件处理器,代码如下:
    /**
     * 事件处理器
     *
     * 监听与服务端连接成功事件,连接成功后,写消息给服务端
     * 监听向服务端写消息完成的事件
     * 监听接收到来自服务端的消息
     * 捕捉异常事件
     *
     * @author hui.wang09
     * @since 22 February 2019
     */
    public class ClientLogicHandler extends SimpleChannelHandler {
    
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("客户端连接成功!");
            String str = "hi server!";
            //异步
            e.getChannel().write(str);
        }
    
        @Override
        public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
            System.out.println("客户端写消息完成");
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            String msg = (String) e.getMessage();
            System.out.println("客户端接收到消息, msg: " + msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getCause().printStackTrace();
            e.getChannel().close();
        }
    }
    

    到这里,server端和client端编写完成,分别启动sever端和client端,查看控制台打印结果:

    1. server端:
    server start success
    连接成功, channel: [id: 0xcd07c136, /127.0.0.1:53966 => /127.0.0.1:8088]
    接收到了client的消息, msg: hi server!
    服务端发送数据: hi, client完成
    
    1. client端:
    client start success
    客户端连接成功!
    客户端写消息完成
    客户端接收到消息, msg: hi, client
    

    5. zookeeper的Curator简单使用

    使用zookeeper原生API实现一些复杂的东西比较麻烦。所以,出现了两款比较好的开源客户端,对zookeeper的原生API进行了包装:zkClient和curator。后者是Netflix出版的,必属精品,也是最好用的zk的开源客户端。

    Curator使用示例
    1. 添加maven依赖
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>
    
    1. 使用Curator连接zookeeper
        private static CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
    
    

    参数说明:

    • connectString:zk的server地址,多个server之间使用英文逗号分隔开
    • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
    • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
    • retryPolicy:失败重试策略

    关于重试策略 ExponentialBackoffRetry:构造器含有三个参数ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

    • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
      计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
    • maxRetries:最大重试次数
    • maxSleepMs:最大sleep时间
    1. 创建节点
                    // 创建回话
            client.start();
    
            //创建一个初始内容为空的节点
            client.create().forPath("/China");
            client.create().forPath("/America", "zhangsan".getBytes());
            //创建一个初始内容为空的临时节点
            client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");
            //递归创建,/Russia是持久节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());
    
    1. 获取节点内容
                    client.start();
            byte[] data = client.getData().forPath("/America");
            System.out.println(new String(data));
            //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
            byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America");
            System.out.println(new String(data2));
    
    1. 跟新节点
                    client.start();
            client.setData().forPath("/America");
            client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
    
    1. 删除节点
                    client.start();
            //只能删除叶子节点
            client.delete().forPath("/China");
            //删除一个节点,并递归删除其所有子节点
            client.delete().deletingChildrenIfNeeded().forPath("/Russia");
            //强制指定版本进行删除
            client.delete().withVersion(5).forPath("/America");
            //注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
            client.delete().guaranteed().forPath("/America");
    
    1. curator事件监听,监听分为指定节点监听和子节点变化监听,具体代码如下:
        /**
         * 监听指定节点本身的变化,包括节点本身的创建和节点本身数据的变化
         */
        @Test
        public void watch() throws Exception {
            client.start();
            client.create().creatingParentsIfNeeded().forPath("/book/computer", "java".getBytes());
    
            // 添加 NodeCache
            NodeCache nodeCache = new NodeCache(client, "/book/computer");
            // 添加 NodeCache listener
            nodeCache.getListenable().addListener(() -> System.out.println("新的节点数据:" + new String(nodeCache.getCurrentData().getData())));
            nodeCache.start(true);
    
            client.setData().forPath("/book/computer", "c++".getBytes());
        }
    
    
        /**
         * 监听子节点变化情况
         * 1 新增子节点
         * 2 删除子节点
         * 3 子节点数据变更
         */
        @Test
        public void watchChildren() throws Exception{
            client.start();
    
            // 添加 pathChildrenCache
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/book13", true);
            // 添加 pathChildrenCache listener
            pathChildrenCache.getListenable().addListener((client, event) ->  {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("新增子节点:" + event.getData().getPath());
                            break;
                        case CHILD_UPDATED:
                            System.out.println("子节点数据变化:" + event.getData().getPath());
                            break;
                        case CHILD_REMOVED:
                            System.out.println("删除子节点:" + event.getData().getPath());
                            break;
                        default:
                            break;
                    }
            });
            pathChildrenCache.start();
    
            client.create().forPath("/book13");
            client.create().forPath("/book13/car", "bmw".getBytes());
        }
    

    相关文章

      网友评论

          本文标题:dubbo源码(七)-spring、zookeeper、nett

          本文链接:https://www.haomeiwen.com/subject/vuuneqtx.html