好久没有发布文章,今天抽点时间写个 spark 自定义 UDF
函数,使用 sparksql
实现数据内部的两两比较的功能。
首先实现 Hamming 距离
UDF, 具体实现如下:
/**
* hamming 距离
*/
public class HammingDistanceUDF implements UDF2<Object, Object, Integer> {
@Override
public Integer call(Object o1, Object o2) throws Exception {
if (o1 instanceof BigInteger && o2 instanceof BigInteger) {
return ((BigInteger) o1).xor(((BigInteger) o2)).bitCount();
} else if (o1 instanceof Integer && o2 instanceof Integer) {
return Integer.bitCount((((int) o1) ^ (((int) o2))));
} else if (o1 instanceof Long && o2 instanceof Long) {
return Long.bitCount((((long) o1) ^ (((long) o2))));
} else if (o1 instanceof String && o2 instanceof String) {
int distance = 0;
String s1 = (String) o1;
String s2 = (String) o2;
int len = Math.max(s1.length(), s2.length());
char chz1, chz2;
for (int i = 1; i <= len; i++) {
if (s1.length() >= i) {
chz1 = s1.charAt(s1.length() - i);
} else {
distance++;
continue;
}
if (s2.length() >= i) {
chz2 = s2.charAt(s2.length() - i);
} else {
distance++;
continue;
}
if (chz1 != chz2) {
distance++;
}
}
return distance;
}
return null;
}
}
如上所示,实现了两数字,字符串的 hamming距离
,这时候,注册到 sqlContext
中即可使用,如下:
sparkSession.sqlContext().udf().register("hamming", new HammingDistanceUDF(), DataTypes.IntegerType);
简单建立个小表验证一下,这里直接使用 cross join
实现笛卡尔集生成,并使用 hamming
:
sparkSession = SparkSession
.builder()
.master("local[*]") // local test
.appName("hiveJob")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
List<Person> personList = new ArrayList<Person>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("wangwu", 21));
personList.add(new Person("zhouliu", 23));
personList.add(new Person("tianqi", 60));
personList.add(new Person("wangliu", 23));
// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> personDatasetRow = personDataset.toDF("name", "num");
// 增加id字段
Dataset<Row> personDatasetRowWithId = personDatasetRow.withColumn("id", functions$.MODULE$.monotonicallyIncreasingId());
// 注册临时表
personDatasetRowWithId.createOrReplaceTempView("people");
// 注册 UDF 函数
sparkSession.sqlContext().udf().register("hamming", new HammingDistanceUDF(), DataTypes.IntegerType);
// sparksql 使用 cross join 方式比较 name 字段的距离
Dataset<Row> dataset = sparkSession.sql("select * from (select p1.id as id1, " +
"p1.name as name1, " +
"p2.id as id2, " +
"p2.name as name2, " +
"hamming(p1.name, p2.name) as distance " +
"from people p1 cross join people p2) where distance < 5 and id1 != id2");
dataset.show();
如上所示,基本能完成字段内部的数据比较功能
网友评论