美文网首页
Spark 学习笔记

Spark 学习笔记

作者: 小小兰哈哈 | 来源:发表于2022-01-03 11:09 被阅读0次

    dataframe

    create

    创建dataframe

       val training = ss.createDataFrame(Seq(

         (1.0, Vectors.dense(0.0, 1.1, 0.1)),

         (0.0, Vectors.dense(2.0, 1.0, -1.0)),

         (0.0, Vectors.dense(2.0, 1.3, 1.0)),

         (1.0, Vectors.dense(0.0, 1.2, -0.5))

       )).toDF("label", "features")

    read

    读csv,json格式存储的文件

    sc = spark.sparkContext

    df = spark.read.json("examples/src/main/resources/people.json")

    df = spark.read.option("delimiter", "\t").option("header", "true").csv(path) [读取以\t分隔的文件到dateframe]

    df.write.option("header",true)

      .csv("/tmp/spark_output/datacsv")

    people = spark.read.parquet("...")

    column get

    列相关操作

    features get: ageCol = people.age

    添加现有列:

    df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))

    df_with_x7 = df_with_x6.withColumn("x7", Rand())

    func.col("id")

    row: row = Row(name="Alice", age=11)

    'wrong_key' in Person

    define function [自定义函数]

    create function:

    from pyspark.sql.functions import udf

    @udf

    def to_upper(s):

       if s is not None:

           return s.upper()  

    df_new2 = df_new.withColumn("topnum", get_front3_value("Country"))

    传入固定值:

    使用lit

    from pyspark.sql.functions import lit

    show the content of Data Frame [展示Data Frame的内容]

    df.show()

    自定义函数并取列

    df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age"))

    去重

    dataFrame.dropDuplicates("id","label")

    dataFrame.distinct()

    vector

    向量操作

    dense vector: a = Vectors.dense([1.0, 2.0])

    sparse vector: SparseVector(2, [0, 1], [2., 1.])

    vector: toArray()

    mLlib

    import

    from pyspark.ml.clustering import Kmeans

    initial parameter 初始化模型超参数

    kmeans = KMeans(k=100, maxIter=200)

    train model 模型训练

    model0 = kmeans.fit(input0.limit(200000))

    kmeans.clusterCenters()

    data split

    训练数据和测试数据切分

    (trainingData, testData) = data.randomSplit([0.7, 0.3])

    Feature scaled

    特征级别化(可分桶,标准化)

    max min scaler:

    featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data) [max min scale]

    scalerModel = scaler.fit(dealfeature)

    bucket scaler:

    from pyspark.ml.feature import Bucketizer

    splits = [float("-inf"), 0,100, 200, 300, 400, 500, 600, 700,

               800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, float("inf")]

    dealinput = dealinput.withColumn("ctr", dealinput["ctr"].cast("float")).withColumn("ctr", dealinput["ctrv3"].cast("float"))

    bucketizer = Bucketizer(splits=splits, inputCol="ctr", outputCol="bucketedctr")

    dealout = bucketizer.transform(dealinput)

    Data Type view and Data Type transformer

    数据类型查看: df.dtypes

    数据类型转换: dealoutput = dealinput.select(dealinput["Clicks"].cast("float")

    train test split

    (deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

    data filter

    数据过滤[条件筛选]

    outanaly = predres.filter(predres.prediction ==1).select("InClickBinary", "prediction")

    metrics

    指标评估

    from pyspark.mllib.evaluation import MulticlassMetrics

    predictionAndLabels = predres.rdd.map(lambda x:(float(x.prediction), float(x.InClickBinary)))

    metrics = MulticlassMetrics(predictionAndLabels)

    print("Area under accauracy = %s" % metrics.precision(1.0))

    print("Area under recall = %s" % metrics.recall(1.0))

    column assemble to vector

    每列特征集成到一个dense vector

    #get the vector feature

    assembler = VectorAssembler(

         inputCols = ["Clicks", "BinaryFeature","isMarket", "ctrv2", "ctrv3", "Feature1", "Feature2", "Feature3", "Feature4", "Feature5", "Feature6", "Feature7", "Feature8"],

         outputCol = "features")

    dealfeature = assembler.transform(dealoutput)

    (deal_train, deal_test) = dealfeature.randomSplit([0.9,0.1])

    column to sparse vector

    N列特征到sparse vector

    featureids = [i for i in range(len(featurekeys))]

    featuredict = dict(zip(featurekeys, featureids))

    speckeys = ["ctrv2", "ctrv3", "isMarket", "BinaryFeature"]

    from pyspark.sql.functions import struct

    def get_sparse_vector(x):

       valuedict = {}

       for index, item in enumerate(speckeys):

           valuedict[featuredict[item]] = x[index]

       for index in range(8):

           nindex = 4 + index

           valuedict[featuredict["prediction%s_%s"%(index, x[nindex])]] = 1.0

       return Vectors.sparse(len(featuredict), valuedict)

    sparsecreate = udf(get_sparse_vector, VectorUDT())

    dealfeature = dealout.withColumn("features", sparsecreate(struct("A", "B", "C", "D", "E", "F", "G", "H","I", "J", "K", "L", "M")))

    model train

    模型训练

    以LogisticRegression为例:

    lr = LogisticRegression(featuresCol="features", labelCol="BinaryFeature", maxIter = 200)

    lrmodel = lr.fit(deal_train)

    predreslr = lrmodel.transform(deal_test)

    相关文章

      网友评论

          本文标题:Spark 学习笔记

          本文链接:https://www.haomeiwen.com/subject/lkdxcrtx.html