# coding=utf8
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
'''
基本RDD“转换”运算
'''
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
global Path
if sc.master[0:5] == 'local':
Path = '1.txt'
# print sc.master
# print Path
####################### 创建RDD ###########################
textRdd = sc.textFile(Path)
print textRdd.count()
print textRdd.take(5)
userRdd = textRdd.map(lambda line: line.split(','))
print userRdd.take(5)
####################### 创建DataFrame ###########################
# 创建sqlContext
sqlContext = SparkSession.builder.getOrCreate()
# 定义schema
user_rows = userRdd.map(lambda p:
Row(
userid=int(p[0]),
age=int(p[1]),
gender=p[2],
work=p[3],
zipcode=p[4]
)
)
print user_rows.collect()
# 创建DataFrames
user_df = sqlContext.createDataFrame(user_rows)
user_df.printSchema()
user_df.show()
# 为DataFrame创建别名
udf = user_df.alias('udf')
udf.show()
####################### 使用spark sql ###########################
# 登录临时表
user_df.registerTempTable('user_table')
# 查看项数
sqlContext.sql('select count(*) counts from user_table').show()
# 多行输入spark sql语句
sqlContext.sql('''
select count(*)
counts
from user_table
''').show()
# 使用spark sql查看数据
# 使用Spark SQL limit指定要显示的项数
sqlContext.sql('select * from user_table limit 5').show()
1.txt内容:
1,24,M,technician,85711
2,53,F,other,94043
3,23,M,writer,32067
4,24,M,technician,43537
5,33,F,other,16213
网友评论