Spark中使用Python实现WordCount业务

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PythonWordCount").master("local[*]").getOrCreate()
# 将文件转换为RDD对象
lines = spark.read.text("input.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()

PySpark中的DataFrame
- DataFrame类似于Python中的数据表,允许处理大量结构化数据
- DataFrame优于RDD,同时包含RDD的功能
# 从集合中创建RDD
rdd = spark.sparkContext.parallelize([
(1001, "张飞", 8341, "坦克"),
(1002, "关羽", 7107, "战士"),
(1003, "刘备", 6900, "战士")
])
# 指定模式, StructField(name,dataType,nullable)
# name: 该字段的名字,dataType:该字段的数据类型,nullable: 指示该字段的值是否为空
from pyspark.sql.types import StructType, StructField, LongType, StringType # 导入类型
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("hp", LongType(), True), #生命值
StructField("role_main", StringType(), True)
])
# 对RDD应用该模式并且创建DataFrame
heros = spark.createDataFrame(rdd, schema)
heros.show()

# 利用DataFrame创建一个临时视图
heros.registerTempTable("HeroGames")
# 查看DataFrame的行数
print(heros.count())
# 使用自动类型推断的方式创建dataframe
data = [(1001, "张飞", 8341, "坦克"),
(1002, "关羽", 7107, "战士"),
(1003, "刘备", 6900, "战士")]
df = spark.createDataFrame(data, schema=['id', 'name', 'hp', 'role_main'])
print(df) #只能显示出来是DataFrame的结果
df.show() #需要通过show将内容打印出来
print(df.count())

- 从CSV文件中读取
heros = spark.read.csv("./heros.csv", header=True, inferSchema=True)
heros.show()
- 从MySQL中读取
df = spark.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/wucai?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai',
dbtable='heros',
user='root',
password='passw0rdcc4'
).load()
print('连接JDBC,调用Heros数据表')
df.show()
# 传入SQL语句
sql="(SELECT id, name, hp_max, role_main FROM heros WHERE role_main='战士') t"
df = spark.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/wucai?useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai',
dbtable=sql,
user='root',
password='passw0rdcc4'
).load()
df.show()
Project:Titanic乘客生存预测(Spark)
泰坦尼克海难是著名的十大灾难之一,究竟多少人遇难,各方统计的结果不一。现在我们可以得到部分的数据:
GitHub地址:https://github.com/cystanford/Titanic_Data
数据集格式为csv,一共有两个文件:
train.csv:训练集,包含特征信息,分类结果(存活与否);
test.csv:测试集,只包含特征信息
数据集中的字段内容如下:
字段 | 描述 |
---|---|
PassengerId | 乘客编号 |
Survived | 是否幸存 |
Pclas,有些特征标注的英文 | 船票等级 |
Name | 乘客姓名 |
Sex | 乘客性别 |
SibSp | 亲戚数量(兄妹、配偶数) |
Parch | 亲戚数量(父母、子女数) |
Ticket | 船票号码 |
Fare | 船票价格 |
Cabin | 船舱 |
Embarked | 登陆港口 |
我们需要对数据进行预处理:
- 数据加载
- 数据探索
- 缺失值
- 数值为英文的情况
- 特征选择
- 模型训练与预测
- 模型评估
创建SparkSession
spark=SparkSession.builder.appName('Titanic').getOrCreate()
# 数据加载
df = spark.read.csv("./train.csv", header=True, inferSchema=True).cache()
df.createOrReplaceTempView("train")

EDA探索
# 输出schema,dataframe的数据结构信息
df.printSchema()
# 对Age字段进行描述统计
df.describe(['age']).show()

from pyspark.sql.functions import *
# 缺失值统计, Spark SQL类型转换使用cast, col函数将字符串转换为column对象
df.select(*(
sum(col(c).isNull().cast("int")).alias(c)
for c in df.columns)).show()

# 使用"""定义多行字符串
query = """
SELECT Embarked, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY Embarked
"""
spark.sql(query).show()

# cabin字段缺失值多,去掉该字段
df = df.drop('cabin')
df.show()

# 按照survived字段统计
df.groupBy('survived').count().show()
# 按照survived的一定比例进行采样
sample_df = df.sampleBy('survived', fractions={0: 0.1, 1: 0.5}, seed=0)
sample_df.groupBy('survived').count().show()

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# 添加字段len_name,代表 乘客name的长度
str_length = udf(lambda x: len(x), IntegerType())
# 使用withColumn添加字段
df = df.withColumn('len_name', str_length(df['name']))
df.select('name', 'len_name').show(5)

# 将类别变量 转化为数值
def embarked_to_int(embarked):
if embarked == 'C': return 1
if embarked == 'Q': return 2
if embarked == 'S': return 3
return 0
# 使用udf,定义函数,将类别变量 转化为数值,使用Spark ML中StringIndexer,结果也是一样的
embarked_to_int = udf(embarked_to_int, IntegerType())
# 添加embarked_index字段
df = df.withColumn('embarked_index', embarked_to_int(df['embarked']))
df.select('embarked', 'embarked_index').show(5)

# 计算各列的均值
mean = df.agg(*(mean(c).alias(c) for c in df.columns))
# 字典数据持久化
meaninfo = mean.first().asDict()
print(meaninfo)
# 填充
df = df.fillna(meaninfo["Age"])

数据抽取、转换和特征选择
from pyspark.ml.feature import StringIndexer, VectorAssembler
# StringIndexer将一组字符型标签编码成一组标签索引
df = StringIndexer(inputCol='Sex', outputCol='sex_index').fit(df).transform(df)
df.select('Sex', 'sex_index').show(5)

#使用VectorAssembler将给定列列表组合成单个向量列
inputCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'embarked_index', 'sex_index', 'len_name']
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')
train = assembler.transform(df).select('PassengerId', col('Survived').alias('label'), 'features')
train.show(5)

模型训练与预测
# 使用随机森林
from pyspark.ml.classification import RandomForestClassifier
# 将数据集切分为80%训练集,20%测试集
splits = train.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()
model = RandomForestClassifier(
labelCol="label",
featuresCol="features",
cacheNodeIds=True) # cacheNodeIds: 是否缓存节点ID
# 使用train进行训练,test进行预测
predict = model.fit(train).transform(test)
predict.show(5)

模型评估
- 二分类评估:BinaryClassificationEvaluator
- 多分类评估:MulticlassClassificationEvaluator
- 回归评估:RegressionEvaluator
- 聚类评估:ClusteringEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
predictionCol="prediction",
labelCol="label",
metricName="accuracy")
print(evaluator.evaluate(predict))
evaluator = BinaryClassificationEvaluator(
rawPredictionCol='prediction',
labelCol='label',
metricName='areaUnderROC') # ROC曲线下的面积 = AUC 越大越好
# 正样本排在负样本前面的概率
print(evaluator.evaluate(predict))

网友评论