美文网首页
Spark 开发笔记

Spark 开发笔记

作者: 走在成长的道路上 | 来源:发表于2019-03-08 10:23 被阅读0次

这两天尝试将 sparkspring boot 进行结合使用,这样一些数据比较大算法逻辑直接扔给 spark 进行处理,加上 sparkSQL 从而更加贴近 web 开发逻辑。

整体是在 spring boot 架构下进行的操作,因此本文中主要采用 java 为基础开发语言进行开发。

依赖包

  • 版本属性
# spark
spring_data_hadoop_version = 2.3.0.RELEASE
spark_version = 2.3.2
scala_version = 2.11
  • gradle 配置
compile("org.springframework.data:spring-data-hadoop-boot:${spring_data_hadoop_version}")
compile("org.springframework.data:spring-data-hadoop-batch:${spring_data_hadoop_version}")
compile("org.springframework.data:spring-data-hadoop-spark:${spring_data_hadoop_version}")

compile("org.apache.spark:spark-yarn_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-core_${scala_version}:${spark_version}") {
    exclude group: 'com.fasterxml.jackson.module', module: "jackson-module-scala_${scala_version}"
}

compile("com.fasterxml.jackson.module:jackson-module-scala_${scala_version}:2.9.4")

compile("org.apache.spark:spark-streaming_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-hive_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-sql_${scala_version}:${spark_version}")
compile("org.apache.spark:spark-mllib_${scala_version}:${spark_version}")
// hbase
compile 'org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0'

SparkSQL基础框架

  1. json 文件进行查询
SparkConf  sparkConf = new SparkConf().setAppName("sparkTest")
                .setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
//把数据框读取过来完全可以理解为一张表
Dataset<Row> jsonDataFrame = sqlContext.read().json(SparkSQLTest.class.getResource("/data/json/students.json").getPath());
//打印这张表
jsonDataFrame.show();
//打印元数据
jsonDataFrame.printSchema();
//查询并列计算
jsonDataFrame.select("name").show();
jsonDataFrame.select(jsonDataFrame.col("name"), jsonDataFrame.col("score").plus(1)).show();//对socre列值进行加一
//过滤
jsonDataFrame.filter(jsonDataFrame.col("score").gt(80)).show();
//根据某一列进行分组然后统计
jsonDataFrame.groupBy("score").count().show();
sc.close();
  1. json 文件进行 SQL 查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
//把数据框读取过来完全可以理解为一张表
Dataset<Row> jsonDataFrame = sqlContext.read().json(SparkSQLTest.class.getResource("/data/json/students.json").getPath());
jsonDataFrame.createOrReplaceTempView("student");
sqlContext.sql("select * from student").show();
sc.close();
  1. 数据库查询进行临时表查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = SQLContext.getOrCreate(JavaSparkContext.toSparkContext(sc));
String sql = " (select * from t_app) as application";
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://172.19.xxx.xxx:3306/stat");//数据库路径
reader.option("dbtable", sql);//数据表名
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "passwd123");

Dataset<Row> jdbcRDD = reader.load();
jdbcRDD.show();
jdbcRDD.createOrReplaceTempView("xxxx");
sqlContext.sql("select id, appid from xxxx").show();
sc.close();
  1. 文本数据处理

下面处理的过程中,如果不使用 lambda 函数方式进行编写会出现序列化异常(java.io.NotSerializableException),目前还不知道原因

JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> textFileRDD = sc.textFile(SparkSQLTest.class.getResource("/data/doc.txt").getPath());
# 下面的步骤都采用 lambda 函数方式进行编写
JavaRDD<String> wordsRDD = textFileRDD.flatMap((FlatMapFunction<String, String>) s -> {
    String[] split = s.split(" ");
    return Arrays.asList(split).iterator();
});
JavaPairRDD<String, Integer> wordsPairRDD = wordsRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> wordCountRDD = wordsPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
wordCountRDD.foreach((VoidFunction<Tuple2<String, Integer>>) stringIntegerTuple2 -> System.out.println(stringIntegerTuple2));
sc.close();
  1. 数组数据查询

通过 JavaSparkContext.parallelize() 函数将常见的数据转为 JavaRDD 类型,这样进一步执行后续操作。实例如下:

  • 数据倍乘

List 数据转为 JavaRDD 之后进行倍乘操作

JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<String> result = rdd.map((Function<Integer, String>) x -> x * x + "");
System.out.println(StringUtils.join(result.collect(), ","));
  • 人员数据查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(new Tuple2<>("zhangsan", 22));
words.add(new Tuple2<>("lisi", 40));
words.add(new Tuple2<>("zhangsan", 21));
words.add(new Tuple2<>("lisi", 23));
words.add(new Tuple2<>("wangwu", 60));
// 构建 JavaRDD 实例
JavaPairRDD<String, Integer> wordsPairRDD = sc.parallelizePairs(words);
// 将数据转为 row 模式
JavaRDD<Row> wordRowRDD = wordsPairRDD.map(tuple -> RowFactory.create(tuple._1, tuple._2));
List<StructField> fieldList = new ArrayList<>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fieldList.add(DataTypes.createStructField("num", DataTypes.IntegerType, false));
// 创建 sql schema
StructType schema = DataTypes.createStructType(fieldList);
SQLContext sqlContext = new SQLContext(sc);
// 构建 Dataset<Row> 实例
Dataset<Row> personDF = sqlContext.createDataFrame(wordRowRDD, schema);
personDF.createOrReplaceTempView("person");

sqlContext.sql("select * from person").show();
sc.close();

上述例子中,将实际的 List 数据转化为 JavaRDD 类型的数据,再将其转化为 row 模式,并添加 schema ,通过 SQLContext 转化为 Dataset<Row> 对象,再创建临时表,进行 SQL 查询。

  • 数据使用 SQL 查询
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));

// 构建基础 Encoder 类, Encoders.bean() 中填入的类必须含有 Getter/Setter 函数,否则没有 schema
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 注册临时表
personDataset.createOrReplaceTempView("person");

sqlContext.sql("select name,sum(num) from person group by name").show();
sc.close();
  • 由 Dataset<Person> 转 Dataset<Row>
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));

// 构建基础 Encoder 类, Encoders.bean() 中填入的类必须含有 Getter/Setter 函数,否则没有 schema
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> personDatasetRow = personDataset.toDF("name", "num");
// 注册临时表
personDatasetRow.createOrReplaceTempView("person");

sqlContext.sql("select name,sum(num) from person group by name").show();
sc.close();

JavaRDD<Object>JavaRDD<Row>Dataset<Row> 三者之间区别如下表:

名称 说明
JavaRDD<Object> 普通对象的数据块
JavaRDD<Row> Row 对象数据集
Dataset<Object> 具有 Schema 类型安全检查,关系性模型,支持 SQL 查询
Dataset<Row> 具有 Schema 类型安全检查,关系性模型,支持 SQL 查询

Dataset<Object> 中的 Schema 与其中的字段名一致
Dataset 对象转 JavaRDD 对象可以直接使用 Dataset.toJavaRDD() 函数转化,毕竟 Dataset 比 JavaRDD 数据结构要严格一些,所以转回去,是比较简单的,反向转化则需要添加 Schema 信息。
Encoders.bean() 中填入的类必须含有 Getter/Setter 函数,否则没有 schema 信息。

  1. 数据导入
  • 通过 jdbc 自动加载数据
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> cassConfigMap = new HashMap<>();
 // 通过 jdbc 的方式进行查询数据
cassConfigMap.put("url", "jdbc:mysql://172.xxx.xxx.xxx:3306/sctest");
cassConfigMap.put("driver", "com.mysql.jdbc.Driver");
cassConfigMap.put("dbtable", "(select * from person) as people");
cassConfigMap.put("user", "user");
cassConfigMap.put("password", "password");
cassConfigMap.put("upperBound", "100");
cassConfigMap.put("lowerBound", "0");
cassConfigMap.put("numPartitions", "1");
cassConfigMap.put("partitionColumn", "id");

Dataset<Row> scheduleDs = sqlContext.read().format("jdbc").options(cassConfigMap).load();
scheduleDs.createOrReplaceTempView("people");
scheduleDs.show();
sc.close();
  • 使用 JdbcRDD 加载数据
// TODO 这里使用 JDBCRDD 会失败
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
// XXX 注: jdbcRDD 中必须带有 lowerBound 和 upperBound ,且必须在 sql 中使用,
// 否则会出现问题 where upperBound > lowerBound 的错误
JdbcRDD<Person> jdbcRDD = new JdbcRDD<>(sc.sc(),
        new DBConnection("com.mysql.jdbc.Driver",
                "jdbc:mysql://172.xxx.xxx.xxx:3306/sctest", "user", "password"),
        "select * from person where ID >= ? AND ID <= ?", 1, 100, 1,
        new ResultObjectMapper<>(Person.class), ClassManifestFactory$.MODULE$.fromClass(Person.class));

long count = jdbcRDD.count();
assert count > 0;

log.debug("=================" + count);

Person[] result = (Person[])jdbcRDD.collect();
log.debug("======" + result);
sc.close();

如上述所示,其中所有参数均由 JdbcRelationProvider 中进行限定的,其中必须包含 partitionColumnlowerBoundupperBoundnumPartitions等参数。

DataSource 类中定义了 backwardCompatibilityMap 来维护数据导入 format ,如下所示:

  /** A map to maintain backward compatibility in case we move data sources around. */
  private val backwardCompatibilityMap: Map[String, String] = {
    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
    val json = classOf[JsonFileFormat].getCanonicalName
    val parquet = classOf[ParquetFileFormat].getCanonicalName
    val csv = classOf[CSVFileFormat].getCanonicalName
    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
    val nativeOrc = classOf[OrcFileFormat].getCanonicalName

    Map(
      "org.apache.spark.sql.jdbc" -> jdbc,
      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
      "org.apache.spark.sql.json" -> json,
      "org.apache.spark.sql.json.DefaultSource" -> json,
      "org.apache.spark.sql.execution.datasources.json" -> json,
      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
      "org.apache.spark.sql.parquet" -> parquet,
      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
      "org.apache.spark.sql.hive.orc" -> orc,
      "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
      "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
      "org.apache.spark.ml.source.libsvm" -> libsvm,
      "com.databricks.spark.csv" -> csv
    )
  }

如上所示,其中的包含基础的导入方式。

  1. 数据导出到文件
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));

// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);

DataFrameWriter<Person> writer = personDataset.write();
// csv 格式保存
writer.csv("./person.csv");
// json 格式保存
writer.json("./person.json");
// 默认保存 parquet 格式文件
writer.save("./person");
sc.close();

DataFrameWriter 输出逻辑类似 hadoop 存储,分 partition 进行存放
SQLContext 对象中存在 json, parquet, csv 等文件导入为 Dataset<Row> 对象的方法

  1. 数据导出到数据库
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Person> personList = new ArrayList<>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("zhangsan", 21));
personList.add(new Person("lisi", 23));
personList.add(new Person("wangwu", 60));

// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);

DataFrameWriter<Person> writer = personDataset.write();

String url = "jdbc:mysql://172.19.xxx.xxx:3306/sctest";
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "passwd123");
properties.put("driver", "com.mysql.jdbc.Driver");
// 自动创建表结构
writer.jdbc(url, "person", properties);

String url1 = "jdbc:mysql://172.19.xxx.xxx:3306/sctest";
Properties properties1 = new Properties();
properties.put("user", "root");
properties.put("password", "passwd123");
properties.put("driver", "com.mysql.jdbc.Driver");
// 追加数据
writer.mode("append").jdbc(url1, "person", properties1);

sc.close();
  1. HBase 读写

所有操作之前,需要在HBase中,通过 Phoenix 建立表结构:

$ sqlline.py am.xxx.com
0: jdbc:phoenix:am.xxx.com> create table t_person (id BIGINT NOT NULL PRIMARY KEY, name VARCHAR, num INTEGER);
0: jdbc:phoenix:am.xxx.com> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
| TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
|            | SYSTEM       | CATALOG     | SYSTEM TABLE  |          |            |                            |                 |              | |
|            | SYSTEM       | FUNCTION    | SYSTEM TABLE  |          |            |                            |                 |              | |
|            | SYSTEM       | LOG         | SYSTEM TABLE  |          |            |                            |                 |              | |
|            | SYSTEM       | SEQUENCE    | SYSTEM TABLE  |          |            |                            |                 |              | |
|            | SYSTEM       | STATS       | SYSTEM TABLE  |          |            |                            |                 |              | |
|            |              | T_PERSON    | TABLE         |          |            |                            |                 |              | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-+
  • HBase 中读取
Dataset<Row> jdbc = sparkSession.read().format("jdbc")
       .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
       .option("url", "jdbc:phoenix:am.xxx.com,an1.xxx.com,an2.xxx.com:2181:/hbase-unsecure")
       .option("fetchsize", "500")
       .option("dbtable", "t_person").load();
jdbc.show(5);
  • 保存到 hbase
List<Person> personList = new ArrayList<>();
personList.add(new Person(1, "zhangsan", 22));
personList.add(new Person(2, "lisi", 40));
personList.add(new Person(3, "zhangsan", 21));
personList.add(new Person(4, "lisi", 23));
personList.add(new Person(5, "wangwu", 60));

// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sparkSession.createDataset(personList, Encoders.bean(Person.class));
personDataset.printSchema();
// 写入
personDataset.write()
        .format("org.apache.phoenix.spark")
        .mode(SaveMode.Overwrite)
        .option("table", "t_person")
        .option("zkUrl", "am.xxx.com,an1.xxx.com,an2.xxx.com:2181:/hbase-unsecure")
        .save();
  1. 自定义 UDF
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<Weather> weatherList = new ArrayList<>();
weatherList.add(new Weather("jinan", 22));
weatherList.add(new Weather("lijiang", 25));
weatherList.add(new Weather("hainan", 31));
weatherList.add(new Weather("shangtou", 23));
weatherList.add(new Weather("beijing", 60));

// 构建基础 Encoder 类
Encoder<Weather> weatherEncoder = Encoders.bean(Weather.class);
// 根据数组创建 Dataset 对象
Dataset<Weather> weatherDataset = sqlContext.createDataset(weatherList, weatherEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> weatherDatasetRow = weatherDataset.toDF("city", "degree");
// 注册临时表
weatherDatasetRow.createOrReplaceTempView("weather");
sqlContext.udf().register("CTOF", (UDF1<Integer, Double>) degreesCelcius -> ((degreesCelcius * 9.0 / 5.0) + 32.0), DataTypes.DoubleType);
sqlContext.sql("select city, CTOF(city) from weather").show();
sc.close();

writer 对象如果不设置模式,则会自动创建表结构 ( 表存在会报错 ) ,设置则按设置的模式进行操作

参考

相关文章

网友评论

      本文标题:Spark 开发笔记

      本文链接:https://www.haomeiwen.com/subject/nqmlpqtx.html