背景
这里我们简单学习一些spring、zookeeper、netty的知识点,因为在后续的dubbo源码中会涉及到这些。如果我们不知道来龙去脉,在理解dubbo源码上有点困难。
1. spring自定义XML标签及其解析
在基于spring开发中,我们很常见使用xml配置一些属性,例如dubbo的配置:
<dubbo:application name="provider" owner="hui.wang" organization="dubbo-learn"/>
大家可否想过怎么自定义这种配置?怎么使用利用spring读取到自定义的配置?
现在我们开始基于spring自定义标签及其解析。
- 编写一个模型类,模型的属性是配置文件对应的属性。这里我简单的编写一个模型类,取名为
MyXml
,代码如下:
/**
* @author hui.wang09
* @since 17 February 2019
*/
public class MyXml {
private String name;
private String value;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
- 定义xsd文件,这里取名为
myXml.xsd
,代码如下:
<xsd:schema
xmlns="http://hui.wang.com/schema"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://hui.wang.com/schema">
<xsd:complexType name="elementname">
<xsd:attribute name="name" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="value" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:element name="elementname" type="elementname">
<xsd:annotation>
<xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:schema>
- 这里的
targetNamespace
和xmlns
要一致 -
xsd:element
将来会在xml文件中用到的元素,例如<dubbo:application>
中的application
-
xsd:attribute
定义的就是模型类中的属性,例如<dubbo:application name="xxx">
中的name
,并且可以指定属性类型,进而起到检测的作用.
- 编写spring.schemas文件,该文件用来指定xsd文件的位置。我这里配置如下:
http\://hui.wang.com/schema/myXml.xsd=META-INF/myXml.xsd
这里的http\://hui.wang.com/schema/
要与xsd文件中的targetNamespace
一致
- 编写
BeanDefinition
解析器,主要用来解析自定义的xml标签。我这里定义一个类,取名MyXmlBeanDefinitionParser
,代码如下:
/**
* @author hui.wang09
* @since 17 February 2019
*/
public class MyXmlBeanDefinitionParser implements BeanDefinitionParser{
private final Class<?> beanClass;
public MyXmlBeanDefinitionParser(Class<?> beanClass) {
this.beanClass = beanClass;
}
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
beanDefinition.getPropertyValues().add("name", element.getAttribute("name"));
beanDefinition.getPropertyValues().add("value", element.getAttribute("value"));
BeanDefinitionRegistry beanDefinitionRegistry = parserContext.getRegistry();
beanDefinitionRegistry.registerBeanDefinition(beanClass.getName(),beanDefinition);
return beanDefinition;
}
}
- 编写命名空间处理器,主要用来注册BeanDefinition解析器。我这边定义一个类,取名为
MyXmlNamespaceHandler
,代码如下:
/**
* @author hui.wang09
* @since 17 February 2019
*/
@Service
public class MyXmlNamespaceHandler extends NamespaceHandlerSupport{
@Override
public void init() {
registerBeanDefinitionParser("elementname", new MyXmlBeanDefinitionParser(MyXml.class));
}
}
- 编写spring.handlers文件,主要用于关联命名空间处理器和xsd中的targetNamespace。代码如下:
http\://hui.wang.com/schema=com.hui.wang.dubbo.learn.provider.spring.xml.MyXmlNamespaceHandler
- 开始测试,开始编写xml文件,编写一个spring xml文件,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:myXml="http://hui.wang.com/schema"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://hui.wang.com/schema http://hui.wang.com/schema/myXml.xsd">
<myXml:elementname value="hui.wang" name="yes"/>
</beans>
xmlns:myXml="http://hui.wang.com/schema"
是myXml.xsd
文件中的targetNamespace
编写一个测试类,代码如下:
/**
* @author hui.wang09
* @since 17 February 2019
*/
public class MyXmlTest {
public static void main(String[] args) throws Exception{
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("myXml.xml");
MyXml myXml = (MyXml) applicationContext.getBean(MyXml.class.getName());
System.out.println(JSON.json(myXml));
}
}
打印结果如下:
{"name":"yes","value":"hui.wang"}
通过这个例子,大家应该明白了怎么自定义xml配置和怎么解析xml的配置了。
2. spring 对外暴露的接口
这里我们将对InitializingBean
、DisposableBean
、ApplicationContextAware
、ApplicationListener
、BeanNameAware
和ApplicationEventPublisherAware
接口含义和使用。
这里我编写了一个类,实现了上面说的所有接口,取名SpringUtils
,代码如下:
/**
* @author hui.wang09
* @since 15 February 2019
*/
@Service
public class SpringUtils implements InitializingBean, DisposableBean, ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringUtils.class);
/**
* org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
* 这个方法只是简单的返回我们当前的beanName
*/
@Override
public void setBeanName(String s) {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.beans.factory.BeanNameAware#setBeanName 调用,s={}", s);
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
/**
* org.springframework.beans.factory.DisposableBean#destroy()
* 在spring初始化bean的时候,如果bean实现了DisposableBean接口,会自动在销毁时调用destroy方法
*/
@Override
public void destroy() throws Exception {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.beans.factory.DisposableBean#destroy()");
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
/**
* org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
* 在spring初始化bean的时候,如果bean实现了InitializingBean接口,会自动调用afterPropertiesSet方法。
* spring为bean提供了两种初始化bean的方式,实现InitializingBean接口,实现afterPropertiesSet方法,
* 或者在配置文件中同过init-method指定,两种方式可以同时使用
*/
@Override
public void afterPropertiesSet() throws Exception {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.beans.factory.InitializingBean#afterPropertiesSet()");
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
/**
* org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
* 实现该接口的类,可以在spring容器初始化的时候调用setApplicationContext方法,从而获得ApplicationContext中的所有bean。
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.context.ApplicationContextAware#setApplicationContext, id={}, appName={}", applicationContext.getId(), applicationContext.getApplicationName());
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
/**
* org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
* 这个方法会通知的所有与事件相匹配的监听器。这些监听可能是spring框架的监听器,也有可能是特定的监听器。
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher");
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
/**
* org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
* 每当ApplicationContext发布ApplicationEvent时,改方法将自动被触发
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
LOGGER.info("========================================================");
LOGGER.info("========================================================");
LOGGER.info("org.springframework.context.ApplicationListener#onApplicationEvent");
LOGGER.info("========================================================");
LOGGER.info("========================================================");
}
}
然后我们启动spring容器,打印结果如下:
22:24:47.592 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.593 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.593 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.beans.factory.BeanNameAware#setBeanName 调用,s=springUtils
22:24:47.596 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.596 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.597 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationContextAware#setApplicationContext, id=org.springframework.web.context.WebApplicationContext:, appName=
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:47.598 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:48.176 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationListener#onApplicationEvent
22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:48.177 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:49.040 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:49.041 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:49.043 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - org.springframework.context.ApplicationListener#onApplicationEvent
22:24:49.044 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
22:24:49.044 [RMI TCP Connection(2)-127.0.0.1] INFO c.h.w.d.l.p.spring.plug.SpringUtils - ========================================================
这里我们通过日志,可以看到这几个方法的调用先后关系和调用时机,同时也可以看到这几个方法被调用时的参数。接下来,我们对这几个接口进行一个简单的说明。
-
InitializingBean
接口,只有一个afterPropertiesSet
方法,在spring初始化bean的时候,如果bean实现了InitializingBean
接口,会自动调用afterPropertiesSet
方法。 -
DisposableBean
接口,只有一个destroy
方法,在spring初始化bean的时候,如果bean实现了DisposableBean
接口,会自动在销毁时调用destroy
方法。 -
ApplicationContextAware
接口,只有一个setApplicationContext
方法,实现该接口的类,可以在spring容器初始化的时候调用setApplicationContext
方法,从而获得ApplicationContext
中的所有bean。 -
ApplicationListener
接口,只有一个接口onApplicationEvent
,每当ApplicationContext
发布ApplicationEvent
时,改方法将自动被触发。 -
BeanNameAware
接口,只有一个接口setBeanName
,这个方法只是简单的返回我们当前的beanName -
ApplicationEventPublisherAware
接口,只有一个接口setApplicationEventPublisher
,这个方法会通知的所有与事件相匹配的监听器。这些监听可能是spring框架的监听器,也有可能是特定的监听器。
上面的几个接口的讲解就到这了。下面说一个不是spring的知识点,也是在后续dubbo源码中会碰到,就是Javassit
。
3. Javassit
说到Javassit
,需要提到动态编程,动态编程是相对于静态编程而言的,平时我们讨论比较多的就是静态编程语言,例如Java,与动态编程语言,例如JavaScript。那二者有什么明显的区别呢?简单的说就是在静态编程中,类型检查是在编译时完成的,而动态编程中类型检查是在运行时完成的。所谓动态编程就是绕过编译过程在运行时进行操作的技术。
在静态语言中引入动态特性,主要是为了解决一些使用场景的痛点。其实完全使用静态编程也办的到,只是付出的代价比较高,没有动态编程来的优雅。例如依赖注入框架Spring使用了反射。
此处我们主要说一下通过动态生成字节码的方式。操作java字节码的工具有两个比较流行,一个是ASM,一个是Javassit 。
-
ASM
:直接操作字节码指令,执行效率高,要是使用者掌握Java类字节码文件格式及指令,对使用者的要求比较高。 -
Javassit
: 提供了更高级的API,执行效率相对较差,但无需掌握字节码指令的知识,对使用者要求较低。
Javassist
是一个开源的分析、编辑和创建Java字节码的类库。Javassist
中最为重要的是ClassPool
,CtClass
,CtMethod
以及 CtField
这几个类。
-
ClassPool
:一个基于HashMap实现的CtClass对象容器,其中键是类名称,值是表示该类的CtClass对象。默认的ClassPool使用与底层JVM相同的类路径,因此在某些情况下,可能需要向ClassPool添加类路径或类字节。 -
CtClass
:表示一个类,这些CtClass
对象可以从ClassPool
获得。 -
CtMethods
:表示类中的方法。 -
CtFields
:表示类中的字段。
下面,编写一个简单的实例,演示一下Javassist
的使用:
- 先编写一个model类,取名为
Student
,代码如下:
public class Student {
}
这里需要说明一下的是,这是一个空的class,没有任何的属性和方法。下面我们就使用Javassist
,进行动态编程。
-
Javassist
的使用代码,代码如下:
public class TestJavassitCompiler {
public static void main(String[] args) throws Exception{
ClassPool pool = ClassPool.getDefault();
CtClass ctClass = pool.makeClass("com.hui.wang.dubbo.learn.provider.javassist.model.Student");
//添加属性:private String name
CtField nameField = new CtField(pool.getCtClass("java.lang.String"), "name", ctClass);
nameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(nameField);
//添加属性:private int age
CtField ageField = new CtField(pool.getCtClass("int"), "age", ctClass);
ageField.setModifiers(Modifier.PRIVATE);
ctClass.addField(ageField);
//getter和setter
ctClass.addMethod(CtNewMethod.getter("getName", nameField));
ctClass.addMethod(CtNewMethod.setter("setName", nameField));
ctClass.addMethod(CtNewMethod.getter("getAge", ageField));
ctClass.addMethod(CtNewMethod.setter("setAge", ageField));
//创建构造器
CtConstructor ctConstructor = new CtConstructor(new CtClass[] {}, ctClass);
String body = new StringBuilder("{\nthis.age = 1;\nthis.name = \"hui.wang\";\n}").toString();
ctConstructor.setBody(body);
ctClass.addConstructor(ctConstructor);
//普通方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "commonMethod", new CtClass[] {}, ctClass);
ctMethod.setModifiers(Modifier.PUBLIC);
ctMethod.setBody(new StringBuilder("{\n System.out.println(\"this is a common method\"); \n" +
"\n System.out.println(this.getAge()); \n}").toString());
ctClass.addMethod(ctMethod);
Class<?> clazz = ctClass.toClass();
Object obj = clazz.newInstance();
//方法调用
obj.getClass().getMethod("commonMethod", new Class[] {}).invoke(obj, new Object[] {});
}
}
打印结果为:
this is a common method
1
4. 关于netty3简单使用
在dubbo中,默认是使用netty3进行通讯的,所以接下来简单介绍一下netty3怎么使用的,网络通讯分为server和client端,下来我们用netty分别创建server和client
server端
- 先创建一个server类,代码如下:
/**
* netty3 server端
*
* @author hui.wang09
* @since 22 February 2019
*/
public class NettyServer {
/**
* 1、首先创建了NioServerSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
* 2、创建ServerBootstrap server端启动辅助类
* 3、为ServerBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
* 4、使用ServerBootstrap绑定监听地址和端口
*/
public void start() {
ChannelFactory factory = new NioServerSocketChannelFactory(
// boss线程池
Executors.newCachedThreadPool(),
// worker线程池
Executors.newCachedThreadPool(),
8
);
ServerBootstrap bootstrap = new ServerBootstrap(factory);
/**
* 对于每一个连接channel, server都会调用PipelineFactory为该连接创建一个ChannelPipline
*/
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ServerLogicHandler());
return pipeline;
}
});
Channel channel = bootstrap.bind(new InetSocketAddress("127.0.0.1", 8088));
System.out.println("server start success");
}
public static void main(String[] args) throws InterruptedException {
NettyServer server = new NettyServer();
server.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
使用netty3
创建server端的步骤如下:
- 首先创建了
NioServerSocketChannelFactory
:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个) - 创建
ServerBootstrap
server端启动辅助类 - 为
ServerBootstrap
设置ChannelPipelineFactory
工厂,并为ChannelPipelineFactory
将来创建出的ChannelPipeline
设置编码器/解码器/事件处理器 - 使用
ServerBootstrap
绑定监听地址和端口
- 设置自己的事件处理器,就是创建server的这句话
pipeline.addLast("handler", new ServerLogicHandler());
,其中ServerLogicHandler
就是自己的事件处理器,代码如下:
/**
* 事件处理器
* 监听与客户端连接成功事件
* 监听接收到来自客户端的消息,之后写回给客户端消息
* 捕捉异常事件
*
* @author hui.wang09
* @since 22 February 2019
*/
public class ServerLogicHandler extends SimpleChannelHandler{
/**
* 监听接收到来自客户端的消息,之后写回给客户端消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
String msg = (String) e.getMessage();
System.out.println("接收到了client的消息, msg: " + msg);
Channel channel = e.getChannel();
String str = "hi, client";
//写消息发给client端
channel.write(str);
System.out.println("服务端发送数据: " + str + "完成");
}
/**
* 捕捉异常事件
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
/**
* 监听与客户端连接成功事件
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("连接成功, channel: " + e.getChannel().toString());
}
}
这个类的注释很多,这里不做说明,接下来就是client端了
client端
- 和server端类似,先用netty3创建client端,代码如下:
/**
* netty3 client
*
* @author hui.wang09
* @since 22 February 2019
*/
public class NettyClient {
/**
* 1、首先创建了NioClientSocketChannelFactory:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个)
* 2、创建ClientBootstrap client端启动辅助类
* 3、为ClientBootstrap设置ChannelPipelineFactory工厂,并为ChannelPipelineFactory将来创建出的ChannelPipeline设置编码器/解码器/事件处理器
* 4、使用ClientBootstrap连接Server端监听的地址和端口
*/
public static void main(String[] args) {
ChannelFactory channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
8
);
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ClientLogicHandler());
return pipeline;
}
});
bootstrap.connect(new InetSocketAddress("127.0.0.1", 8088));
System.out.println("client start success");
}
}
使用netty3创建步骤:
- 首先创建了
NioClientSocketChannelFactory
:创建boss线程池,创建worker线程池以及worker线程数。(boss线程数默认为1个) - 创建
ClientBootstrap
client端启动辅助类 - 为
ClientBootstrap
设置ChannelPipelineFactory
工厂,并为ChannelPipelineFactory
将来创建出的ChannelPipeline
设置编码器/解码器/事件处理器 - 使用
ClientBootstrap
连接Server端监听的地址和端口
- 设置自己的事件处理器,就是创建client的这句话
pipeline.addLast("handler", new ClientLogicHandler());
,其中ClientLogicHandler
就是自己的事件处理器,代码如下:
/**
* 事件处理器
*
* 监听与服务端连接成功事件,连接成功后,写消息给服务端
* 监听向服务端写消息完成的事件
* 监听接收到来自服务端的消息
* 捕捉异常事件
*
* @author hui.wang09
* @since 22 February 2019
*/
public class ClientLogicHandler extends SimpleChannelHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("客户端连接成功!");
String str = "hi server!";
//异步
e.getChannel().write(str);
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
System.out.println("客户端写消息完成");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
String msg = (String) e.getMessage();
System.out.println("客户端接收到消息, msg: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
到这里,server端和client端编写完成,分别启动sever端和client端,查看控制台打印结果:
- server端:
server start success
连接成功, channel: [id: 0xcd07c136, /127.0.0.1:53966 => /127.0.0.1:8088]
接收到了client的消息, msg: hi server!
服务端发送数据: hi, client完成
- client端:
client start success
客户端连接成功!
客户端写消息完成
客户端接收到消息, msg: hi, client
5. zookeeper的Curator简单使用
使用zookeeper原生API实现一些复杂的东西比较麻烦。所以,出现了两款比较好的开源客户端,对zookeeper的原生API进行了包装:zkClient和curator。后者是Netflix出版的,必属精品,也是最好用的zk的开源客户端。
Curator使用示例
- 添加maven依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
- 使用Curator连接zookeeper
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
参数说明:
-
connectString
:zk的server地址,多个server之间使用英文逗号分隔开 -
connectionTimeoutMs
:连接超时时间,如上是30s,默认是15s -
sessionTimeoutMs
:会话超时时间,如上是50s,默认是60s -
retryPolicy
:失败重试策略
关于重试策略 ExponentialBackoffRetry
:构造器含有三个参数ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
-
baseSleepTimeMs
:初始的sleep时间,用于计算之后的每次重试的sleep时间,
计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1))) -
maxRetries
:最大重试次数 -
maxSleepMs
:最大sleep时间
- 创建节点
// 创建回话
client.start();
//创建一个初始内容为空的节点
client.create().forPath("/China");
client.create().forPath("/America", "zhangsan".getBytes());
//创建一个初始内容为空的临时节点
client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");
//递归创建,/Russia是持久节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());
- 获取节点内容
client.start();
byte[] data = client.getData().forPath("/America");
System.out.println(new String(data));
//传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America");
System.out.println(new String(data2));
- 跟新节点
client.start();
client.setData().forPath("/America");
client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
- 删除节点
client.start();
//只能删除叶子节点
client.delete().forPath("/China");
//删除一个节点,并递归删除其所有子节点
client.delete().deletingChildrenIfNeeded().forPath("/Russia");
//强制指定版本进行删除
client.delete().withVersion(5).forPath("/America");
//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
client.delete().guaranteed().forPath("/America");
- curator事件监听,监听分为指定节点监听和子节点变化监听,具体代码如下:
/**
* 监听指定节点本身的变化,包括节点本身的创建和节点本身数据的变化
*/
@Test
public void watch() throws Exception {
client.start();
client.create().creatingParentsIfNeeded().forPath("/book/computer", "java".getBytes());
// 添加 NodeCache
NodeCache nodeCache = new NodeCache(client, "/book/computer");
// 添加 NodeCache listener
nodeCache.getListenable().addListener(() -> System.out.println("新的节点数据:" + new String(nodeCache.getCurrentData().getData())));
nodeCache.start(true);
client.setData().forPath("/book/computer", "c++".getBytes());
}
/**
* 监听子节点变化情况
* 1 新增子节点
* 2 删除子节点
* 3 子节点数据变更
*/
@Test
public void watchChildren() throws Exception{
client.start();
// 添加 pathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/book13", true);
// 添加 pathChildrenCache listener
pathChildrenCache.getListenable().addListener((client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("新增子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("子节点数据变化:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
default:
break;
}
});
pathChildrenCache.start();
client.create().forPath("/book13");
client.create().forPath("/book13/car", "bmw".getBytes());
}
网友评论