美文网首页@产品IT阔论
Elasticsearch趟坑记——后台数据篇

Elasticsearch趟坑记——后台数据篇

作者: 亦一亦二 | 来源:发表于2018-09-30 17:20 被阅读22次
    image

    背景:

    项目隶属于用户画像服务,为了更好的支持前端查询服务,决定将原有的HBase数据源切换成Elasticsearch(以下简称ES),从k-v型数据库切到nosql。本以为是个比较简单的任务,做起来才发现有各种问题,简单记录下。

    本文将从数据写入,索引设置两方面简述遇到的一些问题。
    基本集群情况如下:三台ES,32G内存,其中16G内存分给ES,单台500G SSD。

    数据写入

    • hive to ES
      有专门的工具包,elasticsearch-hadoop-6.3.2.jar,需要注意的是后边的版本号需要和ES的版本一致。将hive表建成外部表,向外部表中写入数据即可。基本语句:
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    TBLPROPERTIES(
    'es.nodes' = '192.168.0.1',
    'es.port'='9200',
    'es.resource' = 'user_profile/user_profile',//表示ES的index/type
    'es.mapping.id' = 'user_id',//指定_id的取值
    'es.mapping.names' = 'name:NAME'//hive字段与ES字段的对应
    )
    

    有两个问题需要注意:

    1. 'es.mapping.id'手动设置,同一index/type的_id具有唯一性,可以进行数据去重,保证数据恢复时的幂等性等。但_id设计到ES的底层存储,需要足够离散,尽量不要用有规律的数字(类比HBase的Rowkey设计),以保证ES的性能,推荐用MD5之后的值作为_id,或用ES自己生成UUID。
    2. hive表名与字段名称不敏感,而ES是敏感的,如果不指定es.mapping.names,导入的ES index的字段将会全部都为小写。这里推荐字段的命名采用下划线的方式以减少可能遇到的问题。
    • spark写入ES
      简单的同步可以用外部表,但如果有一些复杂的逻辑计算,先将结果写入hive再导入ES肯定不是一个好方案,还有需求时从ES查询全量数据后将数据经过计算结果写回ES。对于上述任务,只能自己写代码了。ES提供了spark的访问的工具包。
     libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.3.2"
    

    ES的节点配置写入到sparkconf中:

      val conf = new SparkConf().setAppName("UserProfile")
      conf.set("es.nodes", "192.168.0.01")
      conf.set("es.port", "9200")
      conf.set("es.index.auto.create", "true")
      conf.set("es.nodes.wan.only", "true")
    

    读取写入如下:

      val result_rdd = sc.esRDD("user_profile/user_profile","{\"query\": {\"bool\": {\"must\": [{\"match\": {\"name\": \"test\"}}]}}}")
    
      EsSpark.saveToEs(result_rdd, "user_profile_new/user_profile_new")
    

    上述代码为rdd与ES的交互,dataframe类似,不在赘述。

    这几行代码也带来了本项目前期遇到的一个较大问题:性能,最初观察大概每10s左右有1K数据写入ES,这个效率是完全无法满足线上服务的要求的。经过排查发现是sc.esRDD这个读取的瓶颈,如果spark的并行读设置的足够大,则很容易将ES的cpu打满,如果不大则性能无法满足要求。应该esRDD这个底层实现机制导致,目前没去了解源码,有时间可以仔细排查下具体原因。

    最终本项目的解决方案为用http请求替换sc.esRDD这个读入数据的方法,并通过游标的方式,获取全量的结果数据,最终代码如下:

      val url = "http://192.168.0.1:9200/user_profile/_search?_source=user_id&&scroll=1m"
      val response: HttpResponse[String] = Http(url).method("PUT").postData(x._2).header("Content-Type", "application/json;charset=utf-8").timeout(connTimeoutMs = 1000, readTimeoutMs = 600000).asString
      val parser = new JsonParser()
      val jsonstr = parser.parse(response.body).asInstanceOf[JsonObject]
      val json_array = jsonstr.getAsJsonObject("hits").get("hits").getAsJsonArray()
      val scroll_id = jsonstr.getAsJsonPrimitive("_scroll_id")//通过_scroll_id循环获取全量数据
    

    循环内部代码:

      val parser = new JsonParser()
      val scroll_url = "http://192.168.0.1:9200/_search/scroll"
      val scroll_response: HttpResponse[String] = Http(scroll_url).method("PUT").postData("{ \"scroll\": \"1m\",\"scroll_id\":" + scroll_id + " }").header("Content-Type", "application/json;charset=utf-8").timeout(connTimeoutMs = 1000, readTimeoutMs = 600000).asString
      val scrol_jsonstr = parser.parse(scroll_response.body).asInstanceOf[JsonObject]
      val scroll_json_array = scrol_jsonstr.getAsJsonObject("hits").get("hits").getAsJsonArray()
    

    最终测试结果,每分钟大约10W数据写入,初步满足要求。

    索引设置

    • 写入性能
      上文简单说了下spark写入ES的性能问题,其实hive外部表导入时也会遇到性能瓶颈,简单介绍下我对索引做了哪些设置:
      a、分片与副本:分片数最好为机器的整倍数。为了提高写入速度,可以先将副本数关闭,写入完成后再打开。这样写入时只会写入一个节点,可以大幅提高写入性能。打开副本数时,ES只会消耗网络与IO资源,不会占用cpu。副本数可以提高一定的搜索性能,理性设置。
    curl -H "Content-Type: application/json" -XPUT ${str} -d'
    {
        "settings" : {
            "index" : {
                "number_of_shards" : 3,//分片数
                "number_of_replicas" : 0.//副本数
            }
        }
    }'
    

    b、索引字段类型:hive外部表写入ES时,如果不指定索引字段的类型,hive中int会自动转成long,double会转成float,最好按需要先建索引,然后再建外部表。关于字段是否需要被索引也需要提前设置,默认是全加索引,这也非常影响写入性能与存储空间。
    c、还有一些索引的参数配置,也会一定程度上影响写入速度。

    "refresh_interval": "300s"//刷新间隔,默认1s,如果不要求写入数据实时被检索出来,加大之。
    "translog.flush_threshold_size": "4096mb" //刷新的文件大小阈值
    "index.translog.flush_threshold_ops:":1000000//日志多少条进行flush
    "indices.store.throttle.max_bytes_per_sec" : "100mb"//加大merge速度
    ……
    

    总的来说,分片与副本最影响性能,其次是字段是否需要索引。由于ES的研发思路就是开箱即用,默认配置以满足绝大多数场景,如果不是对ES有一定了解,尽量减少对参数的修改,有时会获得更好的性能。

    • 在线服务
      由于需要对外提供在线服务,如何保证数据更新时的服务也是一个问题。本项目采用了ES自带的别名机制,对外提供的永远是一个确定的名称。每天的数据写入为带时间戳的版本,当数据验证完成时,将新的表的别名设成提供服务的表名。
    curl -H "Content-Type: application/json" -XPOST ${str} -d'
    {
        "actions": [
            { "remove": { "index": "user_profile_'${1}'", "alias": "user_profile" }},
            { "add":    { "index": "user_profile_'${2}'", "alias": "user_profile" }}
        ]
    }'//更改别名
    

    ES默认提供了原子性的操作,保证了同时成功与失败。

    本文简述了后台数据写入ES时遇到的一些问题与解决方案,后续为业务方提供基础的RPC服务也遇到了较多问题,有时间继续总结。

    PS:不得不佩服ES的更新速度,在本项目进行的过程中ES就升级到了6.4.1,让我这种只喜欢用最新版本的,很难受。

    相关文章

      网友评论

        本文标题:Elasticsearch趟坑记——后台数据篇

        本文链接:https://www.haomeiwen.com/subject/kwadoftx.html