美文网首页
scala spark

scala spark

作者: 叫兽吃橙子 | 来源:发表于2018-12-26 15:54 被阅读0次

    一 程序配置

    1.给程序传入参数,运行的jar包和传参放在最后

    spark-submit --master xxx demo.jar "arg1" "arg2"
    

    2.引用 sql 函数包

    import org.apache.spark.sql.functions._
    

    3.启动 sparkshell 窗口,名字为 zy ,指定 test 队列

    spark2-shell --name "zy test" --master yarn --queue test
    

    4.杀死正在运行的程序

    yarn application -kill application_1544177774251_30331
    

    二、dataframe 语法

    1.类似 sql nvl 语法

    withColumn("newid",when($"old_device_number".isNotNull,$"old_device_number").otherwise($"android_all"))
    

    2.类似 sql split 语法

    Val df2=Rdd_04.withColumn("android_all",split($"device_id_raw","\\|")(1))
    

    3.类似 sql group 语法

    val pv=df1.groupBy().count().withColumnRenamed("count","pv").withColumn("key1",lit("1"))
    
    val df2 = df1.groupBy("device_number","visit").sum("duration").withColumnRenamed("sum(duration)", "duration")
    
    val mdf31 = mdf3.groupBy("create_time").agg(countDistinct('order_id),countDistinct('user_id),sum('realpay)).withColumnRenamed("create_time","pay_time")
    
    

    4.窗口函数

    val byKey = Window.orderBy('duration)
    val df3 = df2.withColumn("percent_rank", percent_rank over byKey)
    

    5.传递参数过滤/不传递参数,where和filter过滤

    #多条件过滤
    item_view.filter($"iid" === 1619094 && $"uid" === 2338528).orderBy('iid, 'uid, 'time) show
    item_view.filter('iid === 1622749 && 'uid === 1658732).orderBy('iid, 'uid, 'time) show
    
    val ind:Int=2;
    df.filter($"num"===ind)
    
    val str = s"a"
    df.filter($"id"equalTo(str))
    
    val df41 = df4.where(" pid like '%newcomerActivity%' ")
    df41.filter( "platform = 2")
    

    6.列运算,不要忘记 $ 号,否则有可能识别不出来列

    val df22 = df21.withColumn("pct",$"sum(realpay)" / $"count(DISTINCT user_id)")
    

    7.关联操作,列名不同或者相同

    val day_order = df31.join(df22,df31("create_time") === df22("pay_time"),"left")
    val day_visit = df41.join(df43,Seq("key"),"left").join(df45,Seq("key"),"left")
    

    8.增加一列,选择任意文本/选择传入参数minputTime

     val pv = df1.groupBy().count().withColumnRenamed("count", "pv").withColumn("key1", lit("1"))
     mdf45 = mdf44.groupBy().agg(countDistinct('device_number),count('device_number)).withColumn("key",lit(minputTime))
    

    9.传入每月一号日期/当天日期

    val mf_sdf = new SimpleDateFormat("yyyy-MM-01")
    val cal = Calendar.getInstance()
    var minputTime = mf_sdf.format(cal.getTime)
    
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    var inputTime = sdf.format(cal.getTime)
    

    10.打印DataFrame的Schema信息

    personDF.printSchema
    

    11.DataFrame使用map的示例
    下面的代码是前面的基本信息都跑了之后可以运行

    val d_dev_sql = s""" select uid,device_number from dw.dwd_device_app_info_di where ds='$inputTime' and uid is not null and uid <>"" and uid <>0 group by uid,device_number """
    val d_dev = spark.sql(d_dev_sql)
    d_dev.map(f => "uid:" + f(0)).show
    d_dev.map(f => "uid:" + f.getAs[String]("uid")).show
    

    12.case when

    #CASE WHEN的实现1
    df = df.withColumn('mod_val_test1',F.when(df['rand'] <= 0.35,1).when(df['rand'] <= 0.7, 2).otherwise(3))
    
    添加一条实际执行过的代码
        val accu_purchase2 = accu_purchase1.withColumn("is_pay",when(accu_purchase1("total_pay_cnt") === 0,0).otherwise(1))
    

    14.增加排序的序号,转rdd排序加序号然后再转回来

    直接把排好序需要加序号的 dataframe 赋值给这个是dataframe,而后可以直接排序
       // 在原Schema信息的基础上添加一列 “id”信息
        val schema: StructType = dataframe.schema.add(StructField("id", LongType))
    
        // DataFrame转RDD 然后调用 zipWithIndex
        val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()
    
        val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))
    
        // 将添加了索引的RDD 转化为DataFrame
        val df2 = spark.createDataFrame(rowRDD, schema)
    
        df2.show()
    

    15 参数传递以及四舍五入

        val max_id = df2.count()
        val df3 = df2.withColumn("rank",round(($"id" / max_id),0))
    

    16 修改数据类型

    val order_item2 = order_item.withColumn("realpay", $"realpay".cast(DataTypes.LongType))
    
    SparkSql数据类型
    数字类型
    ByteType:代表一个字节的整数。范围是-128到127
    ShortType:代表两个字节的整数。范围是-32768到32767
    IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
    LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
    FloatType:代表4字节的单精度浮点数
    DoubleType:代表8字节的双精度浮点数
    DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
    StringType:代表一个字符串值
    BinaryType:代表一个byte序列值
    BooleanType:代表boolean值
    Datetime类型
    TimestampType:代表包含字段年,月,日,时,分,秒的值
    DateType:代表包含字段年,月,日的值
    原文链接:https://blog.csdn.net/Android_xue/article/details/100975387
    

    17 转为key-value 形式,value 使用json字符串

    import org.apache.spark.sql.functions.to_json
    val df = spark.sql("""select order_item_id,(named_struct('item_id',item_id,'title',title)) c1
                   from dw.dwd_order_item_dd where ds='2019-11-24' limit 10""")
    val df2 = df.withColumn("c2", to_json($"c1"))
    df2.cache()
    df2.show(truncate=false)
    

    18 设置引用时间的问题

    package cn.idongjia.statistics.auciton
    import cn.idongjia.statistics.util.{Common, Config,DBUtil}
    import org.apache.spark.sql.SparkSession
    
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Properties}
    
    
    
    
    val sdf = new SimpleDateFormat("yyyy-MM-dd")
    
    val s_sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    
    val mill_seconds_of_day: Long = 3600 * 24 * 1000
    
    val m_sdf = new SimpleDateFormat("yyyy-MM")
    
    
    var cal = Calendar.getInstance()
    var inputTime = sdf.format(cal.getTime)
    var endTime = sdf.format(cal.getTime)
    
    
    var cal = Calendar.getInstance()
    var inputTime = sdf.format(cal.getTime)
    #重点在下面这一句 cal.setTime(sdf.parse(inputTime)) ,不写这句,会把时间设置成当前时间的时间戳
    cal.setTime(sdf.parse(inputTime))
    val from_time = cal.getTimeInMillis
    

    19 排序

    item_view.orderBy('uid,'iid,'type desc) show
    

    阅读文档

    单词翻译

    Interfaces: 接口
    Optimizations: 优化
    interact: 相互作用
    Benefits: 实惠
    Manipulated: 任人摆布
    manipulation: 操纵
    Dynamic: 动态
    i.e.:即
    conceptually: 概念地
    under the hood:底层
    builtin: 执行内建的函数
    domain: 领域
    referred as: 称为
    in contrast to:
    come with: 伴随
    Serialization: 序列化
    Verbose: 冗长
    Inferring: 推理
    implicitly: 暗含的
    implicit: 隐式
    subsequent: 随后的
    explicitly: 明确地
    Encoders: 编码器
    retrieves multiple columns at once:同时检索多个列
    specifying: 说明
    parsed: 被解析的
    predefined: 预定义的
    identical: 同一的
    deploying:部署

    股票

    写一些股票每日复盘的东西。

    总体

    中钢天源

    0518

    相关文章

      网友评论

          本文标题:scala spark

          本文链接:https://www.haomeiwen.com/subject/kowhlqtx.html