平台内的产品有一个数据分析,统计平台内某个商户某个时间段内(今天、昨天、7天内、30天内……)的各种数据分析,这种分析显然用MySql的count、sum、GroupBy之类的去查询是很不靠谱的,尤其是在数据量很大的情况下效率就不言而喻了,本来想着用HBase的MR来做,或者直接把各纬度的数据通过HADOOP的MR处理完存到HBase里面,后来与朋友聊天后被朋友严重鄙视了一顿,鄙视的内容基本是嫌弃我们的数据量太小根本用不到HBase更用不到MR,在朋友的极力蛊惑之下决定用ElasticSearch来实现以下简称ES,好吧,那我们还是从传统的搭建-采坑-填坑-再采坑-再填坑开始。
1、下载并安装配置ElasticSearch
ElasticSearch的官网http://www.elastic.co/products/elasticsearch找到需要的版本,我这里选择的是5.6.9的版本,不要问我为什么,因为最新版在我的未知领域有更多的坑!直接下载5.6.9是我目前用着最舒服的一个版本,ES依赖最低JDK1.8版本,所以环境变量一定要配置好
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.9.tar.gz
tar -zxvf elasticsearch-5.6.9.tar.gz -C /usr/local/
cd /usr/local/elasticsearch-5.6.9
ES的配置文件全部在config目录下,其中elasticsearch.yml是主配置文件,进去后主要修改几个地方,其他的地方根据业务需求自行修改:
vim config/其中elasticsearch.yml
1
2
3
4
5
6
cluster.name=tsk-es
node.name=tsk1
path.data: /opt/data/elastic/data
path.logs: /opt/data/elastic/log
network.host: 0.0.0.0
http.port: 9200
不要以为把elasticsearch.yml修改完就可以直接执行bin目录下的elasticsearch,会报一堆错误给你的!第一个错误就是告诉你不能用root用户启动ES,所以你要先创建一个用户,我这里创建一个叫elastic的用户然后记得给用户授文件夹的权限,然后su进入用户启动,但是先别急着进elastic用户,还有些东西需要改一下:
1、修改/etc/security/limits.conf文件,否则会报max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]错误
vim /etc/security/limits.conf
1
2
3
4
* soft nofile 65536
* hard nofile 65536
* soft nproc 2048
* hard nproc 4096
2、修改/etc/sysctl.conf 文件否则会报max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]错误
vim /etc/sysctl.conf
1
vm.max_map_count=262144
全部修改完成之后就可以进入elastic用户启动ES了
su elastic
bin/elasticsrarch
如果不出意外的话你的ES应该正常运行了,这时用浏览器访问192.168.0.1:9200你就会看到一串JSON字符串,证明你的ES已经启动成功,如果想要后台运行ES直接执行
nohup bin/elasticsearch > /opt/data/elastic/elastic.log 2>&1 &
2、ES操作初了解
ES的操作都是通过HTTP请求进行的,不同的请求方式和参数针对不同的操作,比如创建一个索引就是PUT,删除一个索引用的是DELETE,查询如果没参数直接用GET就好,如果有参数或者是提交数据的话用POST,那么我们第一步肯定是先创建索引开始:
PUT:http://192.168.0.1:9200/shopsinfo
{
"mappings":{
"shopsOrder":{
"properties":{
"shopid":{
"type":"string",
"index": "not_analyzed"
},
"createdate":{
"type":"string",
"index": "not_analyzed"
},
"timestamp":{
"type":"long"
},
"paymentType":{
"type":"string" ,
"index": "not_analyzed"
},
"amount":{
"type":"long"
}
}
}
}
}
上面的意思是创建一个名叫shopsinfo的索引,里面有一个叫shopsOrder的mapping,里面有shopid,createdata,timestamp,paymentType,amount几个字段,以及分别对应的type
插入数据比较简单,就是POST就好参数就是一个JSON
POST:http://192.168.0.1:9200/shopsinfo/shopsOrder
{
"shopid": "96119",
"createdate": "20180410",
"timestamp": 1523289600000,
"paymentType": "alipay",
"amount": 6917
}
删除数据
POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_delete_by_query
查询
GET/POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search
关于ES的操作和其他不再过多赘述,官网有中文版,度娘上也一大堆,重中之重是ES的统计查询这是ES的关键
3、重点之查询
ES是属于倒排索引,查询的效率特别的高,但是ES的查询语句就很麻烦了,ES不管是查询、统计都是用POST的BODY以JSON的形式进行的,比如我要查询时间戳>某个时间并且shopId为100000002和100000006的在SQL中是这样的:
select * from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")
在ES中就得这么查:
POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search
{
"size":20,
"query":{
"bool":{
"must":[
{
"range":{
"timestamp":{
"gte":1523671189000
}
}
},
{
"terms":{
"shopid":["100000002","100000006"]
}
}
]
}
}
}
这里面我传递了size参数,如果不传,ES默认给你返回10条数据,查询结果ES也会给你返回JSON,其中hits字段中会有total就是你查询的结果的总数hits会返回给你结果内容。
以上是简单的查询,统计的话ES是以aggs作为参数,全称应该叫做Aggregation,比如接着刚才的查询我想计算出结果的amount总额是多少就是类似SQL中的
select sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")
在ES中就得这么查
{
"aggs":{
"query_amount":{
"sum":{
"field":"amount"
}
}
},
"query":{
"bool":{
"must":[
{
"range":{
"timestamp":{
"gte":1523671189000
}
}
},
{
"terms":{
"shopid":["100000002","100000006"]
}
}
]
}
}
}
统计的结果在返回值的aggregations参数里的query_amount下类似这样的:
......
"aggregations": {
"query_amount": {
"value": 684854
}
}
......
接下来再复杂一点点,按天分组进行统计查询SQL中的提现是这样的:
select createdate,sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")
group by createdate order by createdate
在ES中是这样的:
{
"size":0,
"aggs":{
"orderDate":{
"terms":{
"field":"createdate",
"order":{
"_term":"asc"
}
},
"aggs":{
"query_amount":{
"sum":{
"field":"amount"
}
}
}
}
},
"query":{
"bool":{
"must":[
{
"range":{
"timestamp":{
"gte":1523671189000
}
}
},
{
"terms":{
"shopid":["100000002","100000006"]
}
}
]
}
}
}
查询结果为
......
"aggregations": {
"orderDate": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 99,
"buckets": [
......
{
"key": "20180415",
"doc_count": 8,
"query_amount": {
"value": 31632
}
},
{
"key": "20180417",
"doc_count": 3,
"query_amount": {
"value": 21401
}
},
{
"key": "20180418",
"doc_count": 2,
"query_amount": {
"value": 2333
}
}
......
]
}
}
buckets中就是查询的结果,key为按我createdate分组后的值,doc_count类似count,query_amount为sum后的值。至于我的参数里面有一个size:0是因为我不需要具体的记录就是hits,所以这里传0
最后我们来个更复杂的1、统计所有的总额;2、先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额
{
"size":0,
"aggs":{
"amount":{
"sum":{
"field":"amount"
}
},
"paymenttype":{
"terms":{
"field":"paymentType"
},
"aggs":{
"query_amount":{
"sum":{
"field":"amount"
}
},
"payment_date":{
"terms":{
"field":"createdate"
},
"aggs":{
"query_amount":{
"sum":{
"field":"amount"
}
}
}
}
}
}
},
"query":{
"bool":{
"must":[
{
"range":{
"timestamp":{
"gte":1523671189000
}
}
},
{
"terms":{
"shopid":["100000002","100000006"]
}
}
]
}
}
}
查询结果为:
......
"amount": {
"value": 684854
},
"paymenttype":{
......
"buckets": [
{
"key": "wechatpay",
"doc_count": 73,
"amount": {
"value": 351142
},
"payment_date": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 25,
"buckets": [
......
{
"key": "20180415",
"doc_count": 6,
"amount": {
"value": 29032
}
},
{
"key": "20180425",
"doc_count": 6,
"amount": {
"value": 21592
}
}
......
]
}
},
{
"key": "alipay",
"doc_count": 67,
"amount": {
"value": 333712
},
"payment_date": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 23,
"buckets": [
......
{
"key": "20180506",
"doc_count": 8,
"amount": {
"value": 38280
}
},
{
"key": "20180426",
"doc_count": 6,
"amount": {
"value": 41052
}
}
......
]
}
}
]
}
4、JAVA操作ES
根据自己下载的ES版本下载对应版本的JAR包,我安装的是5.6.9所以我的JAR包版本也应该是5.6.9
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.9</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.9</version>
</dependency>
创建一个helper操作ES,由于我的ES项目是基于SpringBoot的所以我的helper决定交由spring去管理,其实写一个单例也是可以的,先创建client连接
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.net.InetAddress;
Settings settings = Settings.builder().put("cluster.name", "tsk-es").put("client.transport.sniff", true)
.build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddresses(new InetSocketTransportAddress(InetAddress.getByName(HOST), PORT));
插入数据比较简单你可以直接插入JSON字符串,也可以传递JAVA BEAN
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
IndexResponse response = client.prepareIndex(index, mapping).setSource(jsonStr, XContentType.JSON)
.get();
查询就比较麻烦了,已上面最后一个查询先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额为例:
public void getAmountData(Long startTimestamp, String... shopIds) {
// 先初始化一个SearchRequestBuilder,指向shopsInfo/shopsOrder
SearchRequestBuilder sbuilder = client.prepareSearch("shopsinfo").setTypes("shopsOrder");
// 条件一
TermsQueryBuilder mpq = QueryBuilders.termsQuery("shopid", shopIds);
// 条件二
RangeQueryBuilder mpq2 = QueryBuilders.rangeQuery("timestamp").gte(startTimestamp);
// 初始化QueryBuilder
QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(mpq).must(mpq2);
// 将QueryBuilder放入SearchRequestBuilder
sbuilder.setQuery(queryBuilder);
sbuilder.setSize(0);
// sum,这里创建一个实例全部用这个实例就行
SumAggregationBuilder salaryAgg = AggregationBuilders.sum("query_amount").field("amount");
TermsAggregationBuilder paymentAgg = AggregationBuilders.terms("paymentType").field("paymentType");
paymentAgg.size(100);
paymentAgg.subAggregation(salaryAgg);
TermsAggregationBuilder groupDateAff = AggregationBuilders.terms("payment_date").field("createdate")
.order(Order.term(true));
groupDateAff.size(100);
groupDateAff.subAggregation(salaryAgg);
paymentAgg.subAggregation(groupDateAff);
// 将统计查询放入SearchRequestBuilder
sbuilder.addAggregation(salaryAgg).addAggregation(paymentAgg);
SearchResponse response = sbuilder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().asMap();
// 获取全部的总额
InternalSum shopGroupAllAmount = (InternalSum) aggMap.get("amount");
Double amount = shopGroupAllAmount.getValue();
......
}
SearchResponse中就已经可以获取到全部的信息,至于后续怎么解析数据那就看具体业务需求以及每个人的习惯。ES的各种操作说简单也简单说复杂也复杂,按照朋友的话说就是用熟了自然就简单,确实也是这样,不管用啥都得深入了解一下,要不然自己就是个坑!比如我在做统计查询的时候返回的结果总是比应该的结果要少很多,而少的数量总是出现在sum_other_doc_count这个字段里,研究了半天才发现原来统计的结果也需要传递size参数,否则一样默认10条!
最后,还是要感谢一下我那朋友的,虽然他对我想构建各种大数据平台的事情嗤之以鼻(因为我们数据量确实不大),但依然输出了一下他所能想到的最优解。
网友评论