美文网首页大数据
使用SparkSQL操作Elasticsearch - Spar

使用SparkSQL操作Elasticsearch - Spar

作者: DreamsonMa | 来源:发表于2019-08-06 15:28 被阅读62次

    Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的专用支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。

    为Spark添加ES支持

    1、引入Maven

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>7.2.0</version>
        <scope>test</scope>
    </dependency>
    

    2、添加基础配置

    更多配置详情请参考:点我

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Auther: majx2
     * @Date: 2019-8-2 09:41
     * @Description:
     */
    public class SparkHelper {
    
        private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();
    
        public static JavaSparkContext getContext(){
            return JavaSparkContext.fromSparkContext(session.sparkContext());
        }
        public static SparkSession getSession() {
            return session;
        }
    
        private static SparkConf getConf(){
            final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");
            conf.set("es.nodes", "127.0.0.1");
            conf.set("es.port", "9200");
            conf.set("es.net.http.auth.user", "elastic");
            conf.set("es.net.http.auth.pass", "elastic");
            conf.set("es.scroll.size", "10000");
            return conf;
        }
    
        public static StructType getStructType() {
            List<StructField> fields = new ArrayList<>();
            StructField field;
            field = DataTypes.createStructField("id", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("entity", DataTypes.StringType, true);
            fields.add(field);
            return DataTypes.createStructType(fields);
        }
    }
    

    3、搭建一个运行结构

    import cn.hutool.core.bean.BeanUtil;
    import cn.hutool.core.map.MapUtil;
    import com.alibaba.fastjson.JSON;
    import com.midea.ec.fc.datacenter.common.spark.dto.CustomOrderReport;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.elasticsearch.spark.rdd.Metadata;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
    import org.spark_project.guava.collect.ImmutableList;
    import org.spark_project.guava.collect.ImmutableMap;
    import scala.Tuple2;
    
    import java.sql.SQLException;
    import java.sql.Timestamp;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @Auther: majx2
     * @Date: 2019-8-2 09:38
     * @Description:
     */
    @Slf4j
    public class SparkEsTest {
    
        final static JavaSparkContext jsc = SparkHelper.getContext();
        final static SparkSession session =  SparkHelper.getSession();
        final static SQLContext sql = new SQLContext(SparkHelper.getSession());
    
        public static void main(String[] args) throws SQLException {
            // 写入数据
            simpleWrite();
            simpleWrite2();
            jsonWrite();
            dynamicIndexWrite();
            saveWithMeta();
            readByRDD();
            saveBySQL();
            // 读取数据
            readByRDD();
            readBySQL();
        }
    }
    

    写入数据到ES

    1、通过rdd写入两条记录

    并通过es.mapping.id设置元数据_id值为id字段。

    private static void simpleWrite(){
        Map<String, ?> numbers = ImmutableMap.of("id",1,"one", 1, "two", 2);
        Map<String, ?> airports = ImmutableMap.of("id",2,"OTP", "Otopeni", "SFO", "San Fran");
        // 创建一个简单的RDD
        JavaRDD<Map<String, ?>> javaRDD = SparkHelper.getContext().parallelize(ImmutableList.of(numbers, airports));
        JavaEsSpark.saveToEs(javaRDD, "spark/demo",ImmutableMap.of("es.mapping.id", "id"));
    }
    

    效果如下:

    rdd写入es

    2、通过对象的方式写入

    准备对象,并实现序列化。如果不序列化会异常

    import lombok.Data;
    
    import java.io.Serializable;
    import java.sql.Timestamp;
    
    /**
     * @Auther: majx2
     * @Date: 2019-8-5 11:04
     * @Description:
     */
    @Data
    public class CustomOrderReport implements Serializable {
        private static final long serialVersionUID = 4858959432062088728L;
        private Long id;
        private Timestamp shipedReturnTime;
        private String customerCode;
    }
    

    通过rdd的方式写入

    private static void simpleWrite2() throws SQLException {
        List<CustomOrderReport> reportList = Lists.newArrayList();
        List<Entity> list = Db.use().query("select * from f_order_report_701 where id < ?", 30);
        list.forEach(p->{
            CustomOrderReport report = p.toBean(CustomOrderReport.class);
            reportList.add(report);
        });
        JavaRDD<CustomOrderReport> javaRDD = jsc.parallelize(reportList);
        JavaEsSpark.saveToEs(javaRDD, "spark-simple2/_doc", ImmutableMap.of("es.mapping.id", "id"));
    }
    

    效果如下:

    把对象写入ES

    3、通过JSON写入ES

    public static void jsonWrite(){
        String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
        String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
        JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
        JavaEsSpark.saveJsonToEs(stringRDD, "spark-json/_doc");
    }
    

    效果如下:

    将JSON数据写入ES

    4、动态索引写入数据

    使用占位符:{media_type},media_type对应数据中的字段key

    public static void dynamicIndexWrite(){
        Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI",
                "year", "1994");
        JavaRDD<Map<String, ?>> dynamicRDD = jsc.parallelize(ImmutableList.of(game));
        JavaEsSpark.saveToEs(dynamicRDD, "spark-collection-{media_type}/_doc");
    }
    

    效果如下:

    动态索引

    5、带上元数据写入

    public static void saveWithMeta(){
        // 保存元数据
        Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
        Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");
    
        // 文档对应的元数据
        Map<Metadata, Object> otpMeta = ImmutableMap.of(Metadata.ID, 1, Metadata.VERSION, "1");
        Map<Metadata, Object> sfoMeta = ImmutableMap.of(Metadata.ID, "2", Metadata.VERSION, "23");
    
        // create a pair RDD between the id and the docs
        JavaPairRDD<?, ?> pairRDD = jsc.parallelizePairs(
                ImmutableList.of(new Tuple2(otpMeta, otp), new Tuple2(sfoMeta, sfo))
        );
        JavaEsSpark.saveToEsWithMeta(pairRDD, "spark-with-meta-data/_doc");
    }
    

    效果如下:

    添加元数据

    6、通过SparkSQL保存结果

    private static void saveBySQL(){
        final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc", "?q=Otopeni");
        JavaEsSparkSQL.saveToEs(dataset, "spark-sql/_doc",ImmutableMap.of("es.mapping.id", "id"));
    }
    

    效果如下:

    SparkSQL写入数据

    从ES读取数据

    1、通过rdd读取ES数据

    并转换为自定义的Dataset对象。

    private static void readByRDD() {
        JavaPairRDD<String, Map<String, Object>> esPairRDD = JavaEsSpark.esRDD(SparkHelper.getContext(), "f_order_report_701/_doc");
        JavaRDD<Row> esRDD = esPairRDD.map(entity -> {
            Map<String, Object> source = entity._2();
            source.remove("esCreateTime");
            source.remove("esUpdateTime");
            source = MapUtil.sort(source);
            return RowFactory.create(entity._1(), JSON.toJSONString(source));
        });
        Dataset<Row> dataset = SparkHelper.getSession().createDataFrame(esRDD, SparkHelper.getStructType());
        dataset.show(3);
        log.info(dataset.head().toString());
    }
    

    返回结果:

    +---+--------------------+
    | id|              entity|
    +---+--------------------+
    | 69|{"accountTime":15...|
    | 70|{"accountTime":15...|
    | 72|{"accountTime":15...|
    +---+--------------------+
    only showing top 3 rows
    
    2019-08-06 14:41:22.460 [main]  [INFO ] [c.m.e.f.d.common.spark.SparkEsTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0010850","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.0,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"updateTime":1563421756000,"writeOffAmount":0.0,"writeOffStatus":0}]
    

    2、使用SparkSQL读取ES数据

    private static void readBySQL() {
        final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc");
        dataset.createOrReplaceTempView("simple");
        final Dataset<Row> result = session.sql("SELECT * FROM simple WHERE id =1");
        result.show();
    }
    

    返回结果如下:

    +----+----+---+---+---+
    | OTP| SFO| id|one|two|
    +----+----+---+---+---+
    |null|null|  1|  1|  2|
    +----+----+---+---+---+
    

    3、提高性能的写法

    使用elasticsearch-hadoop作为Spark源的一个重要隐藏特性是,连接器理解在DataFrame/SQL中执行的操作,并且在默认情况下,将这些操作转换为适当的QueryDSL。换句话说,连接器直接下推源上的操作,在源上有效地过滤数据,以便只将所需的数据流回Spark。这极大地提高了查询性能,并将Spark和Elasticsearch集群上的CPU、内存和I/O降到最低,因为只返回所需的数据(而不是只返回批量数据,由Spark处理和丢弃)。注意,下推操作即使在指定查询时也适用——连接器将根据指定的SQL增强查询。

    private static void readByPushDown(){
        Dataset<Row> dataset;
    
        dataset = sql.read().format("org.elasticsearch.spark.sql").load("spark-simple/_doc");
        dataset = dataset.filter(dataset.col("id").equalTo(1)).select("id","one","two");
        dataset.show();
    
        session.sql("CREATE TEMPORARY TABLE simple USING org.elasticsearch.spark.sql OPTIONS (path 'spark-simple/_doc',scroll_size '20')");
        dataset = session.sql("SELECT id,one,two FROM simple WHERE id = 1");
        dataset.show();
    }
    

    返回结果:

    +---+---+---+
    | id|one|two|
    +---+---+---+
    |  1|  1|  2|
    +---+---+---+
    
    +---+---+---+
    | id|one|two|
    +---+---+---+
    |  1|  1|  2|
    +---+---+---+
    

    The End !

    参考资料:https://www.elastic.co/guide/en/elasticsearch/hadoop/7.3/spark.html

    相关文章

      网友评论

        本文标题:使用SparkSQL操作Elasticsearch - Spar

        本文链接:https://www.haomeiwen.com/subject/xbnbdctx.html