一 程序配置
1.给程序传入参数,运行的jar包和传参放在最后
spark-submit --master xxx demo.jar "arg1" "arg2"
2.引用 sql 函数包
import org.apache.spark.sql.functions._
3.启动 sparkshell 窗口,名字为 zy ,指定 test 队列
spark2-shell --name "zy test" --master yarn --queue test
4.杀死正在运行的程序
yarn application -kill application_1544177774251_30331
二、dataframe 语法
1.类似 sql nvl 语法
withColumn("newid",when($"old_device_number".isNotNull,$"old_device_number").otherwise($"android_all"))
2.类似 sql split 语法
Val df2=Rdd_04.withColumn("android_all",split($"device_id_raw","\\|")(1))
3.类似 sql group 语法
val pv=df1.groupBy().count().withColumnRenamed("count","pv").withColumn("key1",lit("1"))
val df2 = df1.groupBy("device_number","visit").sum("duration").withColumnRenamed("sum(duration)", "duration")
val mdf31 = mdf3.groupBy("create_time").agg(countDistinct('order_id),countDistinct('user_id),sum('realpay)).withColumnRenamed("create_time","pay_time")
4.窗口函数
val byKey = Window.orderBy('duration)
val df3 = df2.withColumn("percent_rank", percent_rank over byKey)
5.传递参数过滤/不传递参数,where和filter过滤
#多条件过滤
item_view.filter($"iid" === 1619094 && $"uid" === 2338528).orderBy('iid, 'uid, 'time) show
item_view.filter('iid === 1622749 && 'uid === 1658732).orderBy('iid, 'uid, 'time) show
val ind:Int=2;
df.filter($"num"===ind)
val str = s"a"
df.filter($"id"equalTo(str))
val df41 = df4.where(" pid like '%newcomerActivity%' ")
df41.filter( "platform = 2")
6.列运算,不要忘记 $ 号,否则有可能识别不出来列
val df22 = df21.withColumn("pct",$"sum(realpay)" / $"count(DISTINCT user_id)")
7.关联操作,列名不同或者相同
val day_order = df31.join(df22,df31("create_time") === df22("pay_time"),"left")
val day_visit = df41.join(df43,Seq("key"),"left").join(df45,Seq("key"),"left")
8.增加一列,选择任意文本/选择传入参数minputTime
val pv = df1.groupBy().count().withColumnRenamed("count", "pv").withColumn("key1", lit("1"))
mdf45 = mdf44.groupBy().agg(countDistinct('device_number),count('device_number)).withColumn("key",lit(minputTime))
9.传入每月一号日期/当天日期
val mf_sdf = new SimpleDateFormat("yyyy-MM-01")
val cal = Calendar.getInstance()
var minputTime = mf_sdf.format(cal.getTime)
val sdf = new SimpleDateFormat("yyyy-MM-dd")
var inputTime = sdf.format(cal.getTime)
10.打印DataFrame的Schema信息
personDF.printSchema
11.DataFrame使用map的示例
下面的代码是前面的基本信息都跑了之后可以运行
val d_dev_sql = s""" select uid,device_number from dw.dwd_device_app_info_di where ds='$inputTime' and uid is not null and uid <>"" and uid <>0 group by uid,device_number """
val d_dev = spark.sql(d_dev_sql)
d_dev.map(f => "uid:" + f(0)).show
d_dev.map(f => "uid:" + f.getAs[String]("uid")).show
12.case when
#CASE WHEN的实现1
df = df.withColumn('mod_val_test1',F.when(df['rand'] <= 0.35,1).when(df['rand'] <= 0.7, 2).otherwise(3))
添加一条实际执行过的代码
val accu_purchase2 = accu_purchase1.withColumn("is_pay",when(accu_purchase1("total_pay_cnt") === 0,0).otherwise(1))
14.增加排序的序号,转rdd排序加序号然后再转回来
直接把排好序需要加序号的 dataframe 赋值给这个是dataframe,而后可以直接排序
// 在原Schema信息的基础上添加一列 “id”信息
val schema: StructType = dataframe.schema.add(StructField("id", LongType))
// DataFrame转RDD 然后调用 zipWithIndex
val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()
val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))
// 将添加了索引的RDD 转化为DataFrame
val df2 = spark.createDataFrame(rowRDD, schema)
df2.show()
15 参数传递以及四舍五入
val max_id = df2.count()
val df3 = df2.withColumn("rank",round(($"id" / max_id),0))
16 修改数据类型
val order_item2 = order_item.withColumn("realpay", $"realpay".cast(DataTypes.LongType))
SparkSql数据类型
数字类型
ByteType:代表一个字节的整数。范围是-128到127
ShortType:代表两个字节的整数。范围是-32768到32767
IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
FloatType:代表4字节的单精度浮点数
DoubleType:代表8字节的双精度浮点数
DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
StringType:代表一个字符串值
BinaryType:代表一个byte序列值
BooleanType:代表boolean值
Datetime类型
TimestampType:代表包含字段年,月,日,时,分,秒的值
DateType:代表包含字段年,月,日的值
原文链接:https://blog.csdn.net/Android_xue/article/details/100975387
17 转为key-value 形式,value 使用json字符串
import org.apache.spark.sql.functions.to_json
val df = spark.sql("""select order_item_id,(named_struct('item_id',item_id,'title',title)) c1
from dw.dwd_order_item_dd where ds='2019-11-24' limit 10""")
val df2 = df.withColumn("c2", to_json($"c1"))
df2.cache()
df2.show(truncate=false)
18 设置引用时间的问题
package cn.idongjia.statistics.auciton
import cn.idongjia.statistics.util.{Common, Config,DBUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val s_sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val mill_seconds_of_day: Long = 3600 * 24 * 1000
val m_sdf = new SimpleDateFormat("yyyy-MM")
var cal = Calendar.getInstance()
var inputTime = sdf.format(cal.getTime)
var endTime = sdf.format(cal.getTime)
var cal = Calendar.getInstance()
var inputTime = sdf.format(cal.getTime)
#重点在下面这一句 cal.setTime(sdf.parse(inputTime)) ,不写这句,会把时间设置成当前时间的时间戳
cal.setTime(sdf.parse(inputTime))
val from_time = cal.getTimeInMillis
19 排序
item_view.orderBy('uid,'iid,'type desc) show
阅读文档
单词翻译
Interfaces: 接口
Optimizations: 优化
interact: 相互作用
Benefits: 实惠
Manipulated: 任人摆布
manipulation: 操纵
Dynamic: 动态
i.e.:即
conceptually: 概念地
under the hood:底层
builtin: 执行内建的函数
domain: 领域
referred as: 称为
in contrast to: 和
come with: 伴随
Serialization: 序列化
Verbose: 冗长
Inferring: 推理
implicitly: 暗含的
implicit: 隐式
subsequent: 随后的
explicitly: 明确地
Encoders: 编码器
retrieves multiple columns at once:同时检索多个列
specifying: 说明
parsed: 被解析的
predefined: 预定义的
identical: 同一的
deploying:部署
股票
写一些股票每日复盘的东西。
网友评论