美文网首页我爱编程
转~ElasticSearch的搭建与数据统计

转~ElasticSearch的搭建与数据统计

作者: Chting | 来源:发表于2018-05-17 09:23 被阅读0次

     平台内的产品有一个数据分析,统计平台内某个商户某个时间段内(今天、昨天、7天内、30天内……)的各种数据分析,这种分析显然用MySql的count、sum、GroupBy之类的去查询是很不靠谱的,尤其是在数据量很大的情况下效率就不言而喻了,本来想着用HBase的MR来做,或者直接把各纬度的数据通过HADOOP的MR处理完存到HBase里面,后来与朋友聊天后被朋友严重鄙视了一顿,鄙视的内容基本是嫌弃我们的数据量太小根本用不到HBase更用不到MR,在朋友的极力蛊惑之下决定用ElasticSearch来实现以下简称ES,好吧,那我们还是从传统的搭建-采坑-填坑-再采坑-再填坑开始。

    1、下载并安装配置ElasticSearch

     ElasticSearch的官网http://www.elastic.co/products/elasticsearch找到需要的版本,我这里选择的是5.6.9的版本,不要问我为什么,因为最新版在我的未知领域有更多的坑!直接下载5.6.9是我目前用着最舒服的一个版本,ES依赖最低JDK1.8版本,所以环境变量一定要配置好

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.9.tar.gz

    tar -zxvf elasticsearch-5.6.9.tar.gz -C /usr/local/

    cd /usr/local/elasticsearch-5.6.9

     ES的配置文件全部在config目录下,其中elasticsearch.yml是主配置文件,进去后主要修改几个地方,其他的地方根据业务需求自行修改:

    vim config/其中elasticsearch.yml

    1

    2

    3

    4

    5

    6

    cluster.name=tsk-es

    node.name=tsk1

    path.data: /opt/data/elastic/data

    path.logs: /opt/data/elastic/log

    network.host: 0.0.0.0

    http.port: 9200

     不要以为把elasticsearch.yml修改完就可以直接执行bin目录下的elasticsearch,会报一堆错误给你的!第一个错误就是告诉你不能用root用户启动ES,所以你要先创建一个用户,我这里创建一个叫elastic的用户然后记得给用户授文件夹的权限,然后su进入用户启动,但是先别急着进elastic用户,还有些东西需要改一下:

     1、修改/etc/security/limits.conf文件,否则会报max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]错误

    vim /etc/security/limits.conf

    1

    2

    3

    4

    * soft nofile 65536

    * hard nofile 65536

    * soft nproc 2048

    * hard nproc 4096

     2、修改/etc/sysctl.conf 文件否则会报max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]错误

    vim /etc/sysctl.conf

    1

    vm.max_map_count=262144

     全部修改完成之后就可以进入elastic用户启动ES了

    su elastic

    bin/elasticsrarch

     如果不出意外的话你的ES应该正常运行了,这时用浏览器访问192.168.0.1:9200你就会看到一串JSON字符串,证明你的ES已经启动成功,如果想要后台运行ES直接执行

    nohup bin/elasticsearch > /opt/data/elastic/elastic.log 2>&1 &

    2、ES操作初了解

     ES的操作都是通过HTTP请求进行的,不同的请求方式和参数针对不同的操作,比如创建一个索引就是PUT,删除一个索引用的是DELETE,查询如果没参数直接用GET就好,如果有参数或者是提交数据的话用POST,那么我们第一步肯定是先创建索引开始:

    PUT:http://192.168.0.1:9200/shopsinfo

    {

    "mappings":{

        "shopsOrder":{

            "properties":{

                "shopid":{

                    "type":"string",

                    "index": "not_analyzed"

                },

                "createdate":{

                    "type":"string",

                    "index": "not_analyzed"

                },

                "timestamp":{

                    "type":"long"   

                },

                "paymentType":{

                    "type":"string"    ,

                    "index": "not_analyzed"

                },

                "amount":{

                    "type":"long"

                }

            }

        }

    }

    }

     上面的意思是创建一个名叫shopsinfo的索引,里面有一个叫shopsOrder的mapping,里面有shopid,createdata,timestamp,paymentType,amount几个字段,以及分别对应的type

     插入数据比较简单,就是POST就好参数就是一个JSON

    POST:http://192.168.0.1:9200/shopsinfo/shopsOrder

    {

        "shopid": "96119",

        "createdate": "20180410",

        "timestamp": 1523289600000,

        "paymentType": "alipay",

        "amount": 6917

    }

     删除数据

    POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_delete_by_query

     查询

    GET/POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search

     关于ES的操作和其他不再过多赘述,官网有中文版,度娘上也一大堆,重中之重是ES的统计查询这是ES的关键

    3、重点之查询

     ES是属于倒排索引,查询的效率特别的高,但是ES的查询语句就很麻烦了,ES不管是查询、统计都是用POST的BODY以JSON的形式进行的,比如我要查询时间戳>某个时间并且shopId为100000002和100000006的在SQL中是这样的:

    select * from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

     在ES中就得这么查:

    POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search

    {

        "size":20,

        "query":{

            "bool":{

                "must":[

                    {

                        "range":{

                            "timestamp":{

                                "gte":1523671189000

                            }

                        }

                    },

                    {

                        "terms":{

                            "shopid":["100000002","100000006"]

                        }

                    }

                ]

            }

        }

    }

     这里面我传递了size参数,如果不传,ES默认给你返回10条数据,查询结果ES也会给你返回JSON,其中hits字段中会有total就是你查询的结果的总数hits会返回给你结果内容。

     以上是简单的查询,统计的话ES是以aggs作为参数,全称应该叫做Aggregation,比如接着刚才的查询我想计算出结果的amount总额是多少就是类似SQL中的

    select sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

     在ES中就得这么查

    {

        "aggs":{

            "query_amount":{

                "sum":{

                    "field":"amount"

                }

            }

        },

        "query":{

            "bool":{

                "must":[

                    {

                        "range":{

                            "timestamp":{

                                "gte":1523671189000

                            }

                        }

                    },

                    {

                        "terms":{

                            "shopid":["100000002","100000006"]

                        }

                    }

                ]

            }

        }

    }

     统计的结果在返回值的aggregations参数里的query_amount下类似这样的:

    ......

    "aggregations": {

        "query_amount": {

            "value": 684854

        }

    }

    ......

     接下来再复杂一点点,按天分组进行统计查询SQL中的提现是这样的:

    select createdate,sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

    group by createdate order by createdate

     在ES中是这样的:

    {

        "size":0,

        "aggs":{

            "orderDate":{

                "terms":{

                    "field":"createdate",

                    "order":{

                        "_term":"asc"

                    }

                },

                "aggs":{

                    "query_amount":{

                        "sum":{

                            "field":"amount"

                        }

                    }

                }

            }

        },

        "query":{

            "bool":{

                "must":[

                    {

                        "range":{

                            "timestamp":{

                                "gte":1523671189000

                            }

                        }

                    },

                    {

                        "terms":{

                            "shopid":["100000002","100000006"]

                        }

                    }

                ]

            }

        }

    }

     查询结果为

    ......

    "aggregations": {

        "orderDate": {

            "doc_count_error_upper_bound": 0,

            "sum_other_doc_count": 99,

            "buckets": [

                ......

                {

                    "key": "20180415",

                    "doc_count": 8,

                    "query_amount": {

                        "value": 31632

                    }

                },

                {

                    "key": "20180417",

                    "doc_count": 3,

                    "query_amount": {

                        "value": 21401

                    }

                },

                {

                    "key": "20180418",

                    "doc_count": 2,

                    "query_amount": {

                        "value": 2333

                    }

                }

                ......

            ]

        }

    }

     buckets中就是查询的结果,key为按我createdate分组后的值,doc_count类似count,query_amount为sum后的值。至于我的参数里面有一个size:0是因为我不需要具体的记录就是hits,所以这里传0

     最后我们来个更复杂的1、统计所有的总额;2、先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额

        {

        "size":0,

        "aggs":{

            "amount":{

                "sum":{

                    "field":"amount"

                }

            },

            "paymenttype":{

                "terms":{

                    "field":"paymentType"

                },

                "aggs":{

                    "query_amount":{

                        "sum":{

                            "field":"amount"

                        }

                    },

                    "payment_date":{

                        "terms":{

                            "field":"createdate"

                        },

                        "aggs":{

                            "query_amount":{

                                "sum":{

                                    "field":"amount"

                                }

                            }

                        }

                    }

                }

            }

        },

        "query":{

            "bool":{

                "must":[

                    {

                        "range":{

                            "timestamp":{

                                "gte":1523671189000

                            }

                        }

                    },

                    {

                        "terms":{

                            "shopid":["100000002","100000006"]

                        }

                    }

                ]

            }

        }

    }

     查询结果为:

    ......

    "amount": {

        "value": 684854

    },

    "paymenttype":{

        ......

        "buckets": [

            {

                "key": "wechatpay",

                "doc_count": 73,

                "amount": {

                    "value": 351142

                },

                "payment_date": {

                    "doc_count_error_upper_bound": 0,

                    "sum_other_doc_count": 25,

                    "buckets": [

                        ......

                        {

                            "key": "20180415",

                            "doc_count": 6,

                            "amount": {

                                "value": 29032

                            }

                        },

                        {

                            "key": "20180425",

                            "doc_count": 6,

                            "amount": {

                                "value": 21592

                            }

                        }

                        ......

                    ]

                }

            },

            {

                "key": "alipay",

                "doc_count": 67,

                "amount": {

                    "value": 333712

                },

                "payment_date": {

                    "doc_count_error_upper_bound": 0,

                    "sum_other_doc_count": 23,

                    "buckets": [

                        ......

                        {

                            "key": "20180506",

                            "doc_count": 8,

                            "amount": {

                                "value": 38280

                            }

                        },

                        {

                            "key": "20180426",

                            "doc_count": 6,

                            "amount": {

                                "value": 41052

                            }

                        }

                        ......

                    ]

                }

            }

        ]

    }

    4、JAVA操作ES

     根据自己下载的ES版本下载对应版本的JAR包,我安装的是5.6.9所以我的JAR包版本也应该是5.6.9

    <dependency>

        <groupId>org.elasticsearch</groupId>

        <artifactId>elasticsearch</artifactId>

        <version>5.6.9</version>

    </dependency>

    <dependency>

        <groupId>org.elasticsearch.client</groupId>

        <artifactId>transport</artifactId>

        <version>5.6.9</version>

    </dependency>

     创建一个helper操作ES,由于我的ES项目是基于SpringBoot的所以我的helper决定交由spring去管理,其实写一个单例也是可以的,先创建client连接

    import org.elasticsearch.common.settings.Settings;

    import org.elasticsearch.client.transport.TransportClient;

    import org.elasticsearch.transport.client.PreBuiltTransportClient;

    import org.elasticsearch.common.transport.InetSocketTransportAddress;

    import java.net.InetAddress;

    Settings settings = Settings.builder().put("cluster.name", "tsk-es").put("client.transport.sniff", true)

                        .build();

    TransportClient client = new PreBuiltTransportClient(settings)

                        .addTransportAddresses(new InetSocketTransportAddress(InetAddress.getByName(HOST), PORT));

     插入数据比较简单你可以直接插入JSON字符串,也可以传递JAVA BEAN

    import org.elasticsearch.action.index.IndexResponse;

    import org.elasticsearch.common.xcontent.XContentType;

    IndexResponse response = client.prepareIndex(index, mapping).setSource(jsonStr, XContentType.JSON)

                    .get();

     查询就比较麻烦了,已上面最后一个查询先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额为例:

    public void getAmountData(Long startTimestamp, String... shopIds) {

        // 先初始化一个SearchRequestBuilder,指向shopsInfo/shopsOrder

        SearchRequestBuilder sbuilder = client.prepareSearch("shopsinfo").setTypes("shopsOrder");

        // 条件一

        TermsQueryBuilder mpq = QueryBuilders.termsQuery("shopid", shopIds);

        // 条件二

        RangeQueryBuilder mpq2 = QueryBuilders.rangeQuery("timestamp").gte(startTimestamp);

        // 初始化QueryBuilder

        QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(mpq).must(mpq2);

        // 将QueryBuilder放入SearchRequestBuilder

        sbuilder.setQuery(queryBuilder);

        sbuilder.setSize(0);

        // sum,这里创建一个实例全部用这个实例就行

        SumAggregationBuilder salaryAgg = AggregationBuilders.sum("query_amount").field("amount");

        TermsAggregationBuilder paymentAgg = AggregationBuilders.terms("paymentType").field("paymentType");

        paymentAgg.size(100);

        paymentAgg.subAggregation(salaryAgg);

        TermsAggregationBuilder groupDateAff = AggregationBuilders.terms("payment_date").field("createdate")

                .order(Order.term(true));

        groupDateAff.size(100);

        groupDateAff.subAggregation(salaryAgg);

        paymentAgg.subAggregation(groupDateAff);

        // 将统计查询放入SearchRequestBuilder

        sbuilder.addAggregation(salaryAgg).addAggregation(paymentAgg);

        SearchResponse response = sbuilder.execute().actionGet();

        Map<String, Aggregation> aggMap = response.getAggregations().asMap();

        // 获取全部的总额

        InternalSum shopGroupAllAmount = (InternalSum) aggMap.get("amount");

        Double amount = shopGroupAllAmount.getValue();

        ......

    }

     SearchResponse中就已经可以获取到全部的信息,至于后续怎么解析数据那就看具体业务需求以及每个人的习惯。ES的各种操作说简单也简单说复杂也复杂,按照朋友的话说就是用熟了自然就简单,确实也是这样,不管用啥都得深入了解一下,要不然自己就是个坑!比如我在做统计查询的时候返回的结果总是比应该的结果要少很多,而少的数量总是出现在sum_other_doc_count这个字段里,研究了半天才发现原来统计的结果也需要传递size参数,否则一样默认10条!

     最后,还是要感谢一下我那朋友的,虽然他对我想构建各种大数据平台的事情嗤之以鼻(因为我们数据量确实不大),但依然输出了一下他所能想到的最优解。

    原文

    相关文章

      网友评论

        本文标题:转~ElasticSearch的搭建与数据统计

        本文链接:https://www.haomeiwen.com/subject/rtgvdftx.html