美文网首页MLSQLSpark 应用
mlsql流任务实现distinct

mlsql流任务实现distinct

作者: hongshen | 来源:发表于2019-06-27 17:33 被阅读24次

    流计算场景里distinct很常用,spark sql对 stream dataset不支持 SELECT COUNT(DISTINCT Company) FROM Orders这种用法,但是dataframe支持dropDuplicates,可以指定columns。下面要加上dropDuplicates的操作。

    通过et可以实现对dataframe的操作

    register语法

    register的语法是这样的:
    REGISTER format '.' path as functionName where? expression? booleanExpression*
    比如:register ScriptUDF.scriptTableas plusFun
    注册一个udf

    那参考这个语法,我想要对tablex去重,命令应该是:
    register DistinctExt.tablexas tbx where inputTable="{}" and columns="{}"
    as语句是语法要求,后面的tbx其实没有用处。

    register语法的原理

    image.png

    通过debug会发现,formatDistinctExtpathtablexoptionwhere后面的配置项们
    mlsql设计的ET是要实现一个SQLAlg接口:

    image.png
    这些接口起的名字似乎都跟机器学习有关,训练,预测,模型之类的,哈哈,个人感觉这块可以再细分下,给每一种功能单独设计一个接口,可以是SQLAlg的子类,SQLAlg本身不要定义那么多接口,比较容易理解。

    实现SQLDistinctExt

    我参考SQLSendMessage实现一个SQLDistinctExt,只需要把load方法实现下就行了

       override def load(spark: SparkSession, _path: String, params: Map[String, String]): Any = {
        val inputTable = params.getOrElse("inputTable", _path)
        val columns = params.getOrElse("columns", "")
        val df = spark.table(inputTable)
        if (columns.isEmpty) {
          df.dropDuplicates().createOrReplaceTempView(inputTable)
        } else {
          df.dropDuplicates(columns.split(",")).createOrReplaceTempView(inputTable)
        }
        null
      }
    

    将这个类放到合适的地方能够被mlsql找到

      def findAlg(name: String) = {
        mapping.get(name.capitalize) match {
          case Some(clzz) =>
            Class.forName(clzz).newInstance().asInstanceOf[SQLAlg]
          case None =>
            if (!name.contains(".") && (name.endsWith("InPlace") || name.endsWith("Ext"))) {
              Class.forName(s"streaming.dsl.mmlib.algs.SQL${name}").newInstance().asInstanceOf[SQLAlg]
            } else {
              try {
                Class.forName(name).newInstance().asInstanceOf[SQLAlg]
              }
              catch {
                case e: Exception =>
                  throw new RuntimeException(s"${name} is not found")
              }
            }
        }
      }
    

    还可以支持!这种宏,在CommandCollection中添加:

    image.png

    使用方法

    在流或者批任务里都可以使用哈,比如你经过处理得到一个table叫做t1,有a,b,c,d,e,f字段,根据a,b,c三个字段去重,你可以这样写

    !distinct t1 "a,b,c" 
    

    你再select t1会发现重复的已经去掉了,当然还可以设置watermarking

    相关文章

      网友评论

        本文标题:mlsql流任务实现distinct

        本文链接:https://www.haomeiwen.com/subject/oqsrcctx.html