1.RDD转DataSet发生异常:Caused by: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
异常信息如下图:

转化代码:
JavaPairRDD<Long,Tuple2<Row,Row>> joinedRDD = clickActionRDD.join(cityInfoRDD);
System.out.println("joinedRDD:"+joinedRDD.count());
JavaRDD<Row> mappedRDD = joinedRDD.map(data->{
long cityid = data._1;
Row clickAction = data._2._1;
Row cityInfo = data._2._2;
long productid = clickAction.getLong(1);
String cityName = cityInfo.getString(1);
String area = cityInfo.getString(2);
return RowFactory.create(cityid,productid,cityName,area);
});
System.out.println("mappedRDD:"+mappedRDD.count());
mappedRDD.foreach(x-> System.out.println(x));
// 基于JavaRDD<Row>的格式,就可以将其转换为DataFrame
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("city_id",DataTypes.LongType,true));
structFields.add(DataTypes.createStructField("city_name",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("area",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("product_id",DataTypes.LongType,true));
StructType schema = DataTypes.createStructType(structFields);
Dataset<Row> df = sparkSession.createDataFrame(mappedRDD, schema);
System.out.println("tmp_click_product_basic:"+df.count());
// 将DataFrame中的数据,注册成临时表(tmp_clk_prod_basic)
df.registerTempTable("tmp_click_product_basic");
对了半天的字段类型都没问题,最后发现原来是封装RDD的字段顺序和创建structFields的字段顺序不一致导致的。更改后,如下图:

再次运行代码,正常。
网友评论