美文网首页
spark UDF2 使用

spark UDF2 使用

作者: 夜空最亮的9星 | 来源:发表于2019-08-28 15:44 被阅读0次
class MyUDF2 extends UDF2[String,Int,String]{
  
  override def call(t1: String, t2: Int): String = {

    val result = t1.concat(" and ").concat(t2.toString)

    result
  }
}



object UdfDemoTest {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("UdfDemoTest")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    //创建测试df
    val userDF = sc.parallelize(userData).toDF("name", "age")
    +-----+---+
    |name |age|
    +-----+---+
    |Leo  |16 |
    |Marry|21 |
    |Jack |14 |
    |Tom  |18 |
    +-----+---+
    // 注册一张user表
    userDF.registerTempTable("user")

    val myTest = new MyUDF2

    sqlContext.udf.register("test",myTest ,DataTypes.StringType)

    sqlContext.sql("select *, test(name,age) as n from user").show

    +-----+---+------------+
    | name|age|           n|
    +-----+---+------------+
    |  Leo| 16|  Leo and 16|
    |Marry| 21|Marry and 21|
    | Jack| 14| Jack and 14|
    |  Tom| 18|  Tom and 18|
    +-----+---+------------+

    sc.stop()
    
    }
}

另参考

import com.alibaba.fastjson.JSON
import org.apache.spark.sql.api.java.UDF2

/**
  * UDF2<String, String, String>
  * 前两个类型是值传进来的值的类型
  * 第一个类型代表json格式的字符串
  * 第二个代表要获取字段值的字段名称
  * 第三个类型代表返回json串里的某个字段的值
  */
class GetJsonObjectUDF extends UDF2[String,String,String] {
  override def call(json: String, field: String): String = {
    try {
      val jsonObject = JSON.parseObject(json)
      return jsonObject.getString(field)
    } catch {
      case e: Exception =>
        e.printStackTrace()
    }
    null
  }
}


数据如下:
+---------+---------------------------------+
|id       |lables                           |
+---------+---------------------------------+
|786      |[1829, 42092, 1766, 179, 1769]   |
|185      |[42059, 1748, 1787, 42092]       |
|324      |[42059, 1748, 122, 1766]         |
|541      |[1763, 42092, 146, 1766, 1775]   |
|143      |[1763, 42092, 146, 1814,  42116] |
|572      |[1829, 42092, 1766, 42086, 1769] |
|140      |[1778, 1748, 1787, 42059]        |
|184      |[1829, 1763, 42092, 1766 179]    |

选出lables标签中包含1766,179的数据

selectDFWithLables(hiveContext,df1,"1766,179")


  //  根据 DFrame 的lables array<String> 取数据
  def selectDFWithLables(hiveContext: HiveContext, dataframe1: DataFrame, lables: String) = {
    import hiveContext.implicits._
    val tmpDF = dataframe1.withColumn("iscontains", lable_array_contains(col("lables"), lit(lables)))
    val tmpSelectDF = tmpDF.filter($"iscontains" === true)
    val resultDF = tmpSelectDF.drop("iscontains")
    resultDF

  }

  /**
    * 自定义函数,判断labels列是否包含字符串中的内容
    */

  def lable_array_contains: UserDefinedFunction = udf[Boolean, collection.mutable.WrappedArray[String], String] {
    (list1, str) => {
      val list2 = str.split(",")
      var ifExist = false
      var flag = true

      if (list1.isEmpty) flag = false

      for (field <- list2 if flag) {
        if (list1.contains(field)) {
          ifExist = true
          flag = false
        }
      }
      ifExist
    }
  }

相关文章

网友评论

      本文标题:spark UDF2 使用

      本文链接:https://www.haomeiwen.com/subject/iwqmectx.html