美文网首页简友广场
6.Maven项目中使用Spark 2.x读写Elasticse

6.Maven项目中使用Spark 2.x读写Elasticse

作者: 依米兒 | 来源:发表于2020-04-14 16:34 被阅读0次
    一、使用spark写数据到Elasticsearch中
    • 连接spark,配置es(前提是maven中所需的基本Spark环境都配置好了,否则还需要添加spark-core的依赖等)
      注:如果是在联网的环境下,在项目的配置文件pom.xml文件中添加依赖,这里我安装的Elasticsearch版本为7.6.2,这里对应自己Elasticsearch的版本,相关依赖的版本可以在官网上查看。
    <dependency>
        <groupId>org.apche.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apche.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apche.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>7.6.2</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>16.0.1</version>
    </dependency>
    

    如果是在脱机状态下,则还需要自己先根据自己Spark版本和Elasticsearch版本下载好对应的包,然后放入到C:\用户\用户名.m2\repository\中对应文件夹下,然后强制更新maven项目。

    import org.apache.spark.sql.SparkSession;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    import scala.Tuple2;
    import java.util.Map
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaPairRDD;
    
    // es.node默认是localhost,如果你的es集群不在本地,把ip换成你集群中master节点所在机器的ip地址
    //es.port是设置es集群中master节点的端口,如果没有进行更改,就是9200,不知道可以去服务器上安装路径下查看config/elasticsearch.yml文件中http.port参数的值
    SparkSession session=SparkSession.builder().master("local[*]")
                       .appName("yourappName")
                       .config("es.node","ip")
                       .config("es.port","9200")
                       .config("es.resource","index/type")
                       .config("es.index.auto.create",true)
    //                 .config("es.mapping.id","id")
                       .config("es.nodes.wan.only", true)
                       .getOrCreate();
    session.sparkContext().setLogLevel("WARN");
    JavaSparkContext jsc=new JavaSparkContext(session.sparkContext());
    

    其他参数配置可查看官方文档

    • JavaEsSpark中有两个写入方法:saveToEs和saveJsonToEs
      • JavaEsSpark.saveToEs(要写入的数据是Map格式)
      • JavaEsSpark.saveJsonToEs(要写入的数据是Json格式,这里使用字符串拼接的方式,其实可以用例如阿里的fastjson包的JSON对象,方便,出错概率也少)
    • 使用saveJsonToEs写入数据到ES
      1.上传的样例数据 待上传数据.png 2.spark代码,这里也可以引入com.alibaba.fastjson.JSONObject包,将需要写入的数据转换成json格式,避免字符串拼接时出现出错。
    JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");       
    JavaRDD<String> writeRDD=readRDD.map(new Function<String, String>() {
        /**
         * 将文本数据写入到ES索引中
        */
        private static final long serialVersionUID = 1L;
        public String call(String v1) throws Exception {
            // TODO Auto-generated method stub
            String[] info=v1.split("\t",-1);
            String fournum;
            fournum="{\"name\":\""+info[0]+"\",\"sex\":\""+info[1]+"\",\"age\":\""+info[2]+"\",\"num\":\""+info[3]+"\"}";
            return fournum;
        }
                
    });
    JavaEsSpark.saveJsonToEs(writeRDD,"test2/four");
    
    • 使用saveToEs将本地文本文件写入到Elasticsearch中
    //先读取文本数据到JavaRDD中,然后将String转换成Map格式,id自动生成
    JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");       
    JavaRDD<Map<String, Object>> writeRDD=readRDD.map(new Function<String, Map<String,Object>>() {
        public Map<String, Object> call(String v1) throws Exception {
            String[] info=v1.split("    ");
            Map<String,Object> users=ImmutableMap.of("name",info[0],"sex",info[1],"age",info[2],"num",info[3]);         
            return users;
        }
    });
    //需要数据是Map格式
    JavaEsSpark.saveToEs(writeRDD, "users/admin");
    

    上传结果可以在安装的可视化工具中查看,也可用curl查看。这里在head插件里可以看到。

    head插件中查看上传后的数据.png
    注:
    1.这里的索引是自动创建的。在插入数据前先看es中是否存在同名的索引,如果有如果结构一样,就不需要再自动创建,如果存在的不需要,可以使用curl命令删除:curl -XDELETE "http://localhost:9200/index"。也可以在es中先根据需求创建好索引,然后直接插入数据。
    2.插入数据时自动生成(如果想指定id,在spark中配置es.mapping.id那一句注释删掉)
    3.分片数和备份数都是1,这个是可以随时修改更新(可使用官方给的kibana工具,官网上有资料,方便快捷)的。
    另外还有很多问题后面深入研究,例如:如何在继续往该索引中插入数据时,保证同一内容的数据不重复生成id插入
    二、查询Elasticsearch中的数据
    • 在所有字段上查询符合条件的数据,例如前面已经导入的数据:test索引中user类的数据。
      注:如果只是查询已经在Elasticsearch中的数据,spark创建连接时,可以不用配置:“es.resource”参数,把“es.index.auto.create”参数设置为false
    //查询数据中包含25的记录,网页上的请求地址:http://masterIP:9200/users/admin/_search/?q="25"&pretty
    // 如果要查询索引中的所有数据,可以不传入第三个参数:"?q=\"25\""
    JavaPairRDD<String, Map<String, Object>> searchRDD=JavaEsSpark.esRDD(jsc,"users/admin","?q=\"25\"");
    searchRDD.foreach(new VoidFunction<Tuple2<String,Map<String,Object>>>() {
        /**
         * searchRDD中的第一个string的值是每条记录的在Elasticsearch索引中的id值
         * 使用http://masterIP:9200/users/admin/id值?pretty 也可查看数据
         */
        private static final long serialVersionUID = 1L;
    
        public void call(Tuple2<String, Map<String, Object>> t) throws Exception {
            // TODO Auto-generated method stub
            System.out.println(t._1+"---------------"+t._2);
        }
    });
    
    通过http请求查询符合条件的数据.png 通过id值查看数据内容.png

    相关文章

      网友评论

        本文标题:6.Maven项目中使用Spark 2.x读写Elasticse

        本文链接:https://www.haomeiwen.com/subject/ziyrmhtx.html