目的:jieba + python + spark + kafka + streaming
材料....
image.pngsudo gedit/ect/profile
# add jdk
export JAVA_HOME=/home/kean/app/jdk1.8.0_121
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
# added by Anaconda3 installer
export ANACONDA_ROOT=/home/kean/app/anaconda3
export PATH=${ANACONDA_ROOT}/bin:$PATH
# set pyspark driver
export PYSPARK_DRIVER_PYTHON=$ANACONDA_ROOT/bin/ipython notebook
export PYSPARK_PYTHON=$ANACONDA_ROOT/bin/python
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
image.png
完成了第一步!!!!
参考:blog.csdn.net/duxu24/article/details/53587451
pyspark启动后输入wordcount
后面构建本地分布式
# add scala
export SCALA_HOME=/home/kean/app/scala-2.11.8
export PATH=${SCALA_HOME}/bin:$PATH
# add spark
export SPARK_HOME=/home/kean/app/spark-2.2.1-bin-hadoop2.7
export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH
# add pyspark'python to python path
export PYTHONPATH=${SPARK_HOME}/python
export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/py4j-0.10.4-src.zip:$PYTHONPATH # 感觉py4j可以去掉
测试py4j的错误
image.png
解决办法:pip install py4j或者SPARK_HOME/python/lib目录下找到,在这个目录下有一个py4j-0.10.4-src.zip的压缩包,把他解压缩放到SPARK_HOME/python/目录下就可以了
image.png在pycharm中使用:
创建一个project时候,选着编译器anaconda/bin/python,另外下面有两个勾选项,选上这样就可以直接调用anaconda以存在的模块。
http://blog.csdn.net/u010171031/article/details/51849562
# encoding: utf-8
"""
@version: python3.6
@author: ‘kean‘
@contact:
@site:
@software: PyCharm
@file: xixi.py
@time: 18-1-21 下午4:01
"""
import os
import sys
# Path for spark source folder
# os.environ['SPARK_HOME'] = "/home/kean/app/spark-2.2.1-bin-hadoop2.7"
# You might need to enter your local IP
# os.environ['SPARK_LOCAL_IP']="192.168.2.138"
# Path for pyspark and py4j
# sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python")
# sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip")
try:
from pyspark import SparkContext
from pyspark import SparkConf
print("Successfully imported Spark Modules")
except ImportError as e:
print("Can not import Spark Modules", e)
sys.exit(1)
sc = SparkContext('local')
words = sc.parallelize(["scala", "java", "hadoop", "spark", "akka"])
print(words.count())
这里并没有使用hadoop,因为我测试streaming从kafka读取数据暂时不涉及到。
kafka读取下数据
如果想在pycharm运行从kafka读取数据脚本,那就需要将下面的jar包放在SPARK——HOME的jars下面,或者submit 提交脚本并添加下面的jar包
spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar
# encoding: utf-8
"""
@version: python3.6
@author: ‘kean‘
@contact:
@site:
@software: PyCharm
@file: play.py
@time: 18-1-21 下午4:39
"""
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
# from pysparktest import myFunc
'''
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
print(rdd.count())
for o in offsetRanges:
print("__________________________________")
print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
print("__________________________________")
'''
def start():
sc = SparkContext("local[2]", "KafkaConsumerTest")
ssc = StreamingContext(sc, 2)
zookeeper = "192.168.1.xxx:2181"
# broker = "192.168.1.xxx:9092"
topic = "info-news"
partition = 0
groupid = "1655"
start = 0
topicPartition = TopicAndPartition(topic, partition)
# fromOffset = {topicPartition: int(start)}
kafkaStream = KafkaUtils.createStream(ssc, zookeeper, groupid, {topic: 1}, {'auto.offset.reset': 'smallest'})
# kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams = {"metadata.broker.list": broker}, fromOffsets = fromOffset)
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
# kafkaStream.pprint()
ssc.start()
ssc.awaitTermination()
'''
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
'''
if __name__ == '__main__':
start()
image.png
网友评论