美文网首页
spark 定制 UDF

spark 定制 UDF

作者: 走在成长的道路上 | 来源:发表于2019-11-25 13:50 被阅读0次

好久没有发布文章,今天抽点时间写个 spark 自定义 UDF 函数,使用 sparksql 实现数据内部的两两比较的功能。

首先实现 Hamming 距离 UDF, 具体实现如下:

/**
 * hamming 距离
 */
public class HammingDistanceUDF implements UDF2<Object, Object, Integer> {

    @Override
    public Integer call(Object o1, Object o2) throws Exception {
        if (o1 instanceof BigInteger && o2 instanceof BigInteger) {
            return ((BigInteger) o1).xor(((BigInteger) o2)).bitCount();
        } else if (o1 instanceof Integer && o2 instanceof Integer) {
            return Integer.bitCount((((int) o1) ^ (((int) o2))));
        } else if (o1 instanceof Long && o2 instanceof Long) {
            return Long.bitCount((((long) o1) ^ (((long) o2))));
        } else if (o1 instanceof String && o2 instanceof String) {
            int distance = 0;
            String s1 = (String) o1;
            String s2 = (String) o2;
            int len = Math.max(s1.length(), s2.length());
            char chz1, chz2;
            for (int i = 1; i <= len; i++) {
                if (s1.length() >= i) {
                    chz1 = s1.charAt(s1.length() - i);
                } else {
                    distance++;
                    continue;
                }
                if (s2.length() >= i) {
                    chz2 = s2.charAt(s2.length() - i);
                } else {
                    distance++;
                    continue;
                }
                if (chz1 != chz2) {
                    distance++;
                }
            }
            return distance;
        }
        return null;
    }

}

如上所示,实现了两数字,字符串的 hamming距离 ,这时候,注册到 sqlContext 中即可使用,如下:

sparkSession.sqlContext().udf().register("hamming", new HammingDistanceUDF(), DataTypes.IntegerType);

简单建立个小表验证一下,这里直接使用 cross join 实现笛卡尔集生成,并使用 hamming

sparkSession = SparkSession
                .builder()
                .master("local[*]")   // local test
                .appName("hiveJob")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .getOrCreate();
List<Person> personList = new ArrayList<Person>();
personList.add(new Person("zhangsan", 22));
personList.add(new Person("lisi", 40));
personList.add(new Person("wangwu", 21));
personList.add(new Person("zhouliu", 23));
personList.add(new Person("tianqi", 60));
personList.add(new Person("wangliu", 23));

// 构建基础 Encoder 类
Encoder<Person> personEncoder = Encoders.bean(Person.class);
// 根据数组创建 Dataset 对象
Dataset<Person> personDataset = sqlContext.createDataset(personList, personEncoder);
// 由 Dataset<Person> 转化为 Dataset<Row>
// 注 这里的顺序需要与 Person 类保持一致
Dataset<Row> personDatasetRow = personDataset.toDF("name", "num");
// 增加id字段
Dataset<Row> personDatasetRowWithId = personDatasetRow.withColumn("id", functions$.MODULE$.monotonicallyIncreasingId());

// 注册临时表
personDatasetRowWithId.createOrReplaceTempView("people");
// 注册 UDF 函数
sparkSession.sqlContext().udf().register("hamming", new HammingDistanceUDF(), DataTypes.IntegerType);

// sparksql 使用 cross join 方式比较 name 字段的距离
Dataset<Row> dataset = sparkSession.sql("select * from (select p1.id as id1, " +
        "p1.name as name1, " +
        "p2.id as id2, " +
        "p2.name as name2, " +
        "hamming(p1.name, p2.name) as distance " +
        "from people p1 cross join people p2) where distance < 5 and id1 != id2");
dataset.show();

如上所示,基本能完成字段内部的数据比较功能

相关文章

网友评论

      本文标题:spark 定制 UDF

      本文链接:https://www.haomeiwen.com/subject/osndwctx.html