美文网首页
PySpark的使用

PySpark的使用

作者: 羋学僧 | 来源:发表于2020-12-02 10:59 被阅读0次

Spark中使用Python实现WordCount业务

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PythonWordCount").master("local[*]").getOrCreate()
# 将文件转换为RDD对象
lines = spark.read.text("input.txt").rdd.map(lambda r: r[0])

counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
spark.stop()

PySpark中的DataFrame

  • DataFrame类似于Python中的数据表,允许处理大量结构化数据
  • DataFrame优于RDD,同时包含RDD的功能
# 从集合中创建RDD
rdd = spark.sparkContext.parallelize([
    (1001, "张飞", 8341, "坦克"),
    (1002, "关羽", 7107, "战士"),
    (1003, "刘备", 6900, "战士")
])

# 指定模式, StructField(name,dataType,nullable)
# name: 该字段的名字,dataType:该字段的数据类型,nullable: 指示该字段的值是否为空
from pyspark.sql.types import StructType, StructField, LongType, StringType  # 导入类型
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("hp", LongType(), True), #生命值
    StructField("role_main", StringType(), True)
])
# 对RDD应用该模式并且创建DataFrame
heros = spark.createDataFrame(rdd, schema)
heros.show()

# 利用DataFrame创建一个临时视图
heros.registerTempTable("HeroGames")
# 查看DataFrame的行数
print(heros.count())
# 使用自动类型推断的方式创建dataframe
data = [(1001, "张飞", 8341, "坦克"),
        (1002, "关羽", 7107, "战士"),
        (1003, "刘备", 6900, "战士")]
df = spark.createDataFrame(data, schema=['id', 'name', 'hp', 'role_main'])
print(df) #只能显示出来是DataFrame的结果
df.show() #需要通过show将内容打印出来
print(df.count())
  • 从CSV文件中读取
heros = spark.read.csv("./heros.csv", header=True, inferSchema=True)
heros.show()
  • 从MySQL中读取
df = spark.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/wucai?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai',
    dbtable='heros',
    user='root',
    password='passw0rdcc4'
    ).load()
print('连接JDBC,调用Heros数据表')
df.show()
# 传入SQL语句
sql="(SELECT id, name, hp_max, role_main FROM heros WHERE role_main='战士') t"
df = spark.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/wucai?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai',
    dbtable=sql,
    user='root',
    password='passw0rdcc4' 
    ).load()
df.show()

Project:Titanic乘客生存预测(Spark)

泰坦尼克海难是著名的十大灾难之一,究竟多少人遇难,各方统计的结果不一。现在我们可以得到部分的数据:
GitHub地址:https://github.com/cystanford/Titanic_Data
数据集格式为csv,一共有两个文件:
train.csv:训练集,包含特征信息,分类结果(存活与否);
test.csv:测试集,只包含特征信息

数据集中的字段内容如下:

字段 描述
PassengerId 乘客编号
Survived 是否幸存
Pclas,有些特征标注的英文 船票等级
Name 乘客姓名
Sex 乘客性别
SibSp 亲戚数量(兄妹、配偶数)
Parch 亲戚数量(父母、子女数)
Ticket 船票号码
Fare 船票价格
Cabin 船舱
Embarked 登陆港口

我们需要对数据进行预处理:

  • 数据加载
  • 数据探索
  • 缺失值
  • 数值为英文的情况
  • 特征选择
  • 模型训练与预测
  • 模型评估

创建SparkSession

spark=SparkSession.builder.appName('Titanic').getOrCreate()
# 数据加载
df = spark.read.csv("./train.csv", header=True, inferSchema=True).cache()
df.createOrReplaceTempView("train")

EDA探索


# 输出schema,dataframe的数据结构信息
df.printSchema()
# 对Age字段进行描述统计
df.describe(['age']).show()
from pyspark.sql.functions import *
# 缺失值统计, Spark SQL类型转换使用cast, col函数将字符串转换为column对象
df.select(*(
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns)).show()
# 使用"""定义多行字符串
query = """
SELECT Embarked, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY Embarked
"""
spark.sql(query).show()
# cabin字段缺失值多,去掉该字段
df = df.drop('cabin')
df.show()
# 按照survived字段统计
df.groupBy('survived').count().show()
# 按照survived的一定比例进行采样
sample_df = df.sampleBy('survived', fractions={0: 0.1, 1: 0.5}, seed=0)
sample_df.groupBy('survived').count().show()
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# 添加字段len_name,代表 乘客name的长度
str_length = udf(lambda x: len(x), IntegerType())
# 使用withColumn添加字段
df = df.withColumn('len_name', str_length(df['name']))
df.select('name', 'len_name').show(5)
# 将类别变量 转化为数值
def embarked_to_int(embarked):
    if embarked == 'C': return 1
    if embarked == 'Q': return 2
    if embarked == 'S': return 3    
    return 0

# 使用udf,定义函数,将类别变量 转化为数值,使用Spark ML中StringIndexer,结果也是一样的
embarked_to_int = udf(embarked_to_int, IntegerType())
# 添加embarked_index字段
df = df.withColumn('embarked_index', embarked_to_int(df['embarked']))
df.select('embarked', 'embarked_index').show(5)
# 计算各列的均值
mean = df.agg(*(mean(c).alias(c) for c in df.columns))
# 字典数据持久化
meaninfo = mean.first().asDict()
print(meaninfo)
# 填充
df = df.fillna(meaninfo["Age"])

数据抽取、转换和特征选择

from pyspark.ml.feature import StringIndexer, VectorAssembler
# StringIndexer将一组字符型标签编码成一组标签索引
df = StringIndexer(inputCol='Sex', outputCol='sex_index').fit(df).transform(df)
df.select('Sex', 'sex_index').show(5)
#使用VectorAssembler将给定列列表组合成单个向量列
inputCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'embarked_index', 'sex_index', 'len_name']
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
train = assembler.transform(df).select('PassengerId', col('Survived').alias('label'), 'features')
train.show(5)

模型训练与预测

# 使用随机森林
from pyspark.ml.classification import RandomForestClassifier
# 将数据集切分为80%训练集,20%测试集
splits = train.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True) # cacheNodeIds: 是否缓存节点ID
# 使用train进行训练,test进行预测
predict = model.fit(train).transform(test)
predict.show(5)

模型评估

  • 二分类评估:BinaryClassificationEvaluator
  • 多分类评估:MulticlassClassificationEvaluator
  • 回归评估:RegressionEvaluator
  • 聚类评估:ClusteringEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")
print(evaluator.evaluate(predict))
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='prediction',
    labelCol='label',
    metricName='areaUnderROC') # ROC曲线下的面积 = AUC 越大越好
# 正样本排在负样本前面的概率
print(evaluator.evaluate(predict))

相关文章

网友评论

      本文标题:PySpark的使用

      本文链接:https://www.haomeiwen.com/subject/insswktx.html