本篇为elasticsearch源码分析系列文章的第七篇,由于接触到作为客户端和ElasticSearch集群通信,所以本篇以ElasticSearch的Client的通信为出发点,讲解有关Client通信和负载均衡的有关详情,叙述的不好或不对的地方还请大家指正:)
如何与集群(Cluster)通信
使用Node与cluster通信
Node node = NodeBuilder.nodeBuilder().clusterName("yourclustername").client(true).node();
Client client = node.client();
因为该节点是仅仅是作为一个客户端而不用保存数据,所以必须设置client(true)。
使用TransportClient与cluster通信
在5.0.0版本之前可以通过如下的代码来构建TransportClient,
通过指定名称来创建
Settings settings = Settings.settingsBuilder().put("cluster.name", "es-client").put("client.transport.sniff", true).build();
TransportClient transportClient = TransportClient.builder().settings(settings).build();
通过IP地址来创建
TransportClient transportClient = TransportClient.builder().build().addTransportAddress(new InetSocketTransportAddress("192.168.1.100", 9300));
通过同网段嗅探来创建
Settings settings = Settings.settingsBuilder().put("client.transport.sniff", true).build();
TransportClient transportClient = TransportClient.builder().settings(settings).builder();
如果设置client.transport.sniff为true,表示客户端去嗅探整个cluster的状态,把集群中其它机器的ip地址加到客户端中,这样做的好处是一般你不用手动设置集群里所有集群的ip到连接客户端,它会自动帮你添加,并且自动发现新加入集群的机器。
但在5.0.0版本之后新增了PreBuiltTransportClient类,而TransportClient变为Abstract类型,且被PreBuiltTransportClient继承。该类的主要意图是,指定在创建TransportClient时必须加载以下的插件:
- Netty4Plugin
- ReindexPlugin
- PercolatorPlugin
- MustachePlugin
- ParentJoinPlugin
可见这些插件是节点的必备插件。所以5.0.0版本以后的TransportClient的创建方式变为:
PreBuiltTransportClient(Settings settings, Class<? extends Plugin>... plugins);
注:集群名称和嗅探模式可以在Settings中设置,而TransportAddress依旧可以通过addTransportAddress方法来设置。
TransportClient中TransportClient加载插件的代码
加载插件可以看到用到了代理,TransportClient的部分API都是TransportClientNodesService进行代理的
初始化TransportClientNodesServiceclient单次请求流程
客户端请求的详细流程如下:
先是实例化部分:
- 1.PreBuiltTransportClient(Settings, plugins, hostFailureListenter)实例化
- 2.super实例化TransportClient
- 3.TransportClient执行buildTemplate方法
- 4.buildTemplate方法中实例化TransportClientNodesService类的对象nodesService
然后是请求部分:
- 1.请求从AbstractClient的不同请求方法中进入(如bulk,clearScroll,delete,explain,fieldCaps,get,index,multiGet,mutiSearch,multiTermVectors,search,searchScroll,termVectors,update)
- 2.执行AbstractClient的execute(action, Request,listener)
- 3.执行TransportClient的doExecute方法,执行TransportClient中proxy的execute
- 4.执行TransportProxyClient的execute方法
- 5.执行TransportClientNodesService实例nodesService的execute方法
- 6.调用NodeListenerCallback回调方法doWithNode
- 7.执行TransportActionNodeProxy的execute方法
- 8.执行TransportService的sendRequest方法
- 9.TransportService调用sendRequest后的回调依次回传
- TransportActionNodeProxy
- NodeListenerCallback
- TransportClientNodesService
- TransportClient
整个客户端模块的简要流程如下:
- client 提供了客户端的操作接口,比如count()
- 代理端TransportClientNodesService的execute()随机一个节点出来
- 代理端TransportClientNodesService通过transportService发送请求
Client的负载均衡
Client的负载均衡是通过TransportClientNodesService类实现的。TransportClientNodesService实例维护一组DiscoveryNode引用,每次客户端请求的时候,会根据负载均衡算法选中一个节点(DiscoveryNode),发送请求。常用的负载算法有Random,Round robin,Hash,StaticWeighted等。ES的客户端负载使用了Round robin算法。
此外TransportClientNodesService还负责嗅探,维护集群节点列表,选举节点的工作。
注入TransportClientNodesServiceTransportClientNodesService的实例化首先注入了集群名称,线程池,最小兼容版本,客户端传输采样时间间隔,ping超时时间。然后配置了节点采样模型NodeSampler。NodeSampler接口很简单,只有一个sample()方法,它的实现类有2个SniffNodesSampler和SimpleNodeSampler,我们在初始化里已经看到了,如果"sniff"配置项是true的话使用SniffNodesSampler类。
两个实现类如下
- 嗅探同一集群中的所有节点(SniffNodesSampler,client会主动发现集群里的其他节点,即使节点不在配置文件中,会创建fully connect)
- 或者是只关注配置文件配置的节点(SimpleNodeSampler,ping listedNodes(也就是配置中设置的节点)中的所有node,区别在于这里创建的都是light connect)
简单的说,SimpleNodeSampler会限制当前可用client一定是在配置中设置的节点中的,这样的意图是让集群中的某些节点专门用来负责接收用户请求,而SniffNodesSampler会使用所有发现的节点,让其参与负载,即使这个节点不在配置中。
得到集群节点列表后,代理端TransportClientNodesService在每次execute时,就可以通过getNodeNumber方法随机获取节点。
如下图:
execute方法调用getNodeNumber 关键的节点选择代码数据写入
节点是如何写入的呢? 在TransportClient的buildTemplate方法中,实例化TransportService的步骤中,通过networkModule的getRransportInterceptor方法得到的TransportInterceptor实例就是通过nettychannel写入数据的地方,如下图:
实例化TransportClient 实例化TransportService数据还是通过上文的NodeSampler实例来写入的,FutureTransportResponseHandler设置回调操作,如下图:
NodeSampler遍历所有的数据节点,写入到新节点里面
数据写入
网友评论