美文网首页
Spark Dataframe 自增ID (Java版)

Spark Dataframe 自增ID (Java版)

作者: user0650 | 来源:发表于2019-03-15 14:56 被阅读0次

背景

用Spark作数据计算框架,将计算结果写入传统关系数据库,例如MySQL,供业务查询,这是工作中经常使用的模式。

在写入MySQL时,经常要加个自增的ID字段。

第一种方案,可以手动创建数据表,定义自增ID字段,Spark写入时用追加模式,ID设为空即可。

第二种方案,Spark写之前就生成好自增ID,直接覆盖写入MySQL。

实际中,我们使用更多的是覆盖写入(自动创建表),所以本文介绍一下方案二的实现。

实现

  1. Schema添加一列:ID
DataFrame df = ...
StructType schema = df.schema().add(DataTypes.createStructField("id", DataTypes.LongType, false));
  1. 使用RDD的zipWithIndex得到索引,作为ID值:
JavaRDD<Row> rdd = df
    .javaRDD() // 转为JavaRDD
    .zipWithIndex() // 添加索引,结果为JavaPairRDD<Row, Long>,即行数据和对应的索引
    .map(new Function<Tuple2<Row, Long>, Row>() {
            @Override
            public Row call(Tuple2<Row, Long> v1) throws Exception {
                Object[] objects = new Object[v1._1.size() + 1];
                for (int i = 0; i < v1._1.size(); i++) {
                    objects[i] = v1._1.get(i);
                }
                objects[objects.length - 1] = v1._2;
                return RowFactory.create(objects);
            }
    }); // 把索引值作为ID字段值,构造新的行数据
  1. 将RDD再转回DataFrame
df = sqlContext.createDataFrame(rdd, schema);
  1. 使用Overwrite模式写入MySQL
Properties props = new Properties();
props.setProperty("user", "user");
props.setProperty("password", "password");
props.setProperty("driver", "com.mysql.jdbc.Driver"));
df
    .write()
    .mode(SaveMode.Overwrite) // 覆盖模式,自动创建表
    .jdbc("jdbcUrl", "tableName", props);

相关文章

网友评论

      本文标题:Spark Dataframe 自增ID (Java版)

      本文链接:https://www.haomeiwen.com/subject/smglmqtx.html