Python3操作Elasticsearch
创建连接
- 指定连接
es = Elasticsearch(
['172.16.153.129:9200'],
# 认证信息
# http_auth=('elastic', 'changeme')
)
#指定参数
es = Elasticsearch([
{'host': 'localhost'},
{'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True},
])
- 动态连接
es = Elasticsearch(
['esnode1:port', 'esnode2:port'],
# 在做任何操作之前,先进行嗅探
sniff_on_start=True,
# 节点没有响应时,进行刷新,重新连接
sniff_on_connection_fail=True,
# 每 60 秒刷新一次
sniffer_timeout=60
)
- 使用SSL
es = Elasticsearch(
['localhost:443', 'other_host:443'],
#打开SSL
use_ssl=True,
#确保我们验证了SSL证书(默认关闭)
verify_certs=True,
#提供CA证书的路径
ca_certs='/path/to/CA_certs',
#PEM格式的SSL客户端证书
client_cert='/path/to/clientcert.pem',
#PEM格式的SSL客户端密钥
client_key='/path/to/clientkey.pem'
)
据取相关信息
- 测试集群是否启动
In [40]: es.ping()
Out[40]: True
- 获取集群基本信息
In [39]: es.info()
Out[39]:
{'cluster_name': 'sharkyun',
'cluster_uuid': 'rIt2U-unRuG0hJBt6BXxqw',
'name': 'master',
'tagline': 'You Know, for Search',
'version': {'build_date': '2017-10-06T20:33:39.012Z',
'build_hash': '1a2f265',
'build_snapshot': False,
'lucene_version': '6.6.1',
'number': '5.6.3'}}
- 获取集群的健康状态信息
In [41]: es.cluster.health()
Out[41]:
{'active_primary_shards': 6,
'active_shards': 6,
'active_shards_percent_as_number': 50.0,
'cluster_name': 'sharkyun',
'delayed_unassigned_shards': 0,
'initializing_shards': 0,
'number_of_data_nodes': 1,
'number_of_in_flight_fetch': 0,
'number_of_nodes': 1,
'number_of_pending_tasks': 0,
'relocating_shards': 0,
'status': 'yellow',
'task_max_waiting_in_queue_millis': 0,
'timed_out': False,
'unassigned_shards': 6}
- 获取当前连接的集群节点信息
In [43]: es.cluster.client.info()
- 获取集群目前所有的索引
In [55]: print(es.cat.indices())
yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb
yellow open .kibana 1Epb3nPFRimFJoRwKHtXIg 1 1 2 0 13.4kb 13.4kb\
- 获取集群的更多信息
es.cluster.stats()
- 利用实例的 cat 属性得到更简单易读的信息
In [85]: es.cat.health()
Out[85]: '1510431262 04:14:22 sharkyun yellow 1 1 6 6 0 0 6 0 - 50.0%\n'
In [86]: es.cat.master()
Out[86]: 'VXgFbKAaTtGO5a1QAfdcLw 172.16.153.129 172.16.153.129 master\n'
In [87]: es.cat.nodes()
Out[87]: '172.16.153.129 27 49 0 0.02 0.01 0.00 mdi * master\n'
In [88]: es.cat.indices()
Out[88]: 'yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb\nyellow open .kibana 1Epb3nPFRimFJoRwKHtXIg 1 1 2 0 13.4kb 13.4kb\n'
In [89]: es.cat.count()
Out[89]: '1510431323 04:15:23 301002\n'
In [90]: es.cat.plugins()
Out[90]: ''
In [91]: es.cat.templates()
Out[91]: 'logstash logstash-* 0 50001\nfilebeat filebeat-* 0 \n'
- 任务
es.tasks.get()
es.tasks.list()
使用 elasticsearch_dsl 模块
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl import Search
es = connections.create_connection(hosts=['10.0.122.148'])
ser = Search(
using = es,
index = "rourou1-filebeat-7.4.2-2019.11.22").filter(
"match" , clientip="110.102.248.210"
).query(
"match" , response="416"
)
res = ser.execute()
print(res)
print(ser.count())
print(ser.to_dict())
--------------------------------------------------
<Response: [<Hit(rourou1-filebeat-7.4.2-2019.11.22/qsDykW4BqZzlViCDQgjG): {'verb': 'GET', 'clientip': '110.102.248.210', 'ecs': {'vers...}> ]>
7
{'query': {'bool': {'filter': [{'match': {'clientip': '110.102.248.210'}}], 'must': [{'match': {'response': '416'}}]}}
using
指明用那个已经连接的对象
filter
接收的是过滤语句 ,过滤的条件意思是在返回结果中有这些条件的信息
query
接收的是查询体语句
exclude
接收的是不匹配的字段 就像 must_not
ser.count()
统计查询结果的条数
ser.to_dict()
返回被发送信息的JSON格式的数据
连接相关
- 显式传递一个连接
如果你不想提供全局配置(也就是默认连接),你可以传入你自己的连接(实例elasticsearch.Elasticsearch)作为参数, 使用 using 接受它:
s = Search(using=Elasticsearch('localhost'))
甚至你可以下面的方式来覆盖一个对象已经关联的任何连接
s = s.using(Elasticsearch('otherhost:9200'))
- 默认链接
要定义全局使用的默认连接,请使用 connections模块和create_connection方法:
from elasticsearch_dsl.connections import connections
client = connections.create_connection(hosts=['172.16.153.129:9200'],
http_auth=('elastic', 'changeme'), timeout=20)
- 多集群环境的连接
from elasticsearch_dsl.connections import connections
clients = connections.configure(
default={'hosts': 'localhost'},
dev={
'hosts': ['esdev1.example.com:9200'],
'sniff_on_start': True
}
)
当然,上面的情况是适用于第一次连接时的情况,若要在运行时设置连接,使用下面的方法
# if you have configuration to be passed to Elasticsearch.__init__
# 直接传递一个配置信息给 Elasticsearch
connections.create_connection('qa', hosts=['esqa1.example.com'], sniff_on_start=True)
# if you already have an Elasticsearch instance ready
# 追加一个已经准备好的连接对象
connections.add_connection('qa', my_client)
Search DSL
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import MultiMatch, Match
#m = Match(clientip={"query": "123.244.101.255"})
#s = Search.query(m)
mm = MultiMatch(
# 被搜索字段的内容
query="123.244.101.255",
# 被搜索的字典query会匹配fields中的每个字段
fields=["clientip", 'timestamp'])
s = Search().query(mm)
调用Search函数的query方法得到<class 'elasticsearch_dsl.search.Search'>对象
Q查询
您可以使用Q快捷方式可以把带参数的名称或原始数据的dict构建成 Search 对应类的实例:
from elasticsearch_dsl import Q
Q("multi_match", query='python django', fields=['title', 'body'])
Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})
# 这两种方式最后转换的结果是一致的
MultiMatch(fields=['title', 'body'], query='python django')
要将查询添加到Search对象,请使用以下.query()方法:
q = Q("multi_match", query='python django', fields=['title', 'body'])
s = s.query(q)
当然,Q 接收的参数,,query 方法都支持
s = s.query("multi_match", query='python django', fields=['title', 'body'])
用 Q 实现组合查询
Q 对象可以使用逻辑运算符进行组合:
Q("match", title='python') | Q("match", title='django')
# {"bool": {"should": [...]}}
# 匹配到任意条件即可
Q("match", title='python') & Q("match", title='django')
# {"bool": {"must": [...]}}
# 列表里的条件必须同时匹配
~Q("match", title="python")
# {"bool": {"must_not": [...]}}
# 非
网友评论