美文网首页
ubuntu pyspark

ubuntu pyspark

作者: Kean_L_C | 来源:发表于2018-01-25 10:59 被阅读129次

    目的:jieba + python + spark + kafka + streaming

    材料....

    image.png

    sudo gedit/ect/profile

    # add jdk
    export JAVA_HOME=/home/kean/app/jdk1.8.0_121
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
    export PATH=${JAVA_HOME}/bin:$PATH
    
    # added by Anaconda3 installer
    export ANACONDA_ROOT=/home/kean/app/anaconda3
    export PATH=${ANACONDA_ROOT}/bin:$PATH
    
    # set pyspark driver
    export PYSPARK_DRIVER_PYTHON=$ANACONDA_ROOT/bin/ipython notebook
    export PYSPARK_PYTHON=$ANACONDA_ROOT/bin/python
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
    
    image.png

    完成了第一步!!!!
    参考:blog.csdn.net/duxu24/article/details/53587451
    pyspark启动后输入wordcount

    后面构建本地分布式

    # add scala
    export SCALA_HOME=/home/kean/app/scala-2.11.8
    export PATH=${SCALA_HOME}/bin:$PATH
    
    
    # add spark
    export SPARK_HOME=/home/kean/app/spark-2.2.1-bin-hadoop2.7
    export PATH=${SPARK_HOME}/bin:${SPARK_HOME}/sbin:$PATH
    
    # add pyspark'python to python path
    export PYTHONPATH=${SPARK_HOME}/python
    export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/py4j-0.10.4-src.zip:$PYTHONPATH # 感觉py4j可以去掉
    

    测试py4j的错误


    image.png

    解决办法:pip install py4j或者SPARK_HOME/python/lib目录下找到,在这个目录下有一个py4j-0.10.4-src.zip的压缩包,把他解压缩放到SPARK_HOME/python/目录下就可以了

    image.png

    在pycharm中使用:

    创建一个project时候,选着编译器anaconda/bin/python,另外下面有两个勾选项,选上这样就可以直接调用anaconda以存在的模块。
    http://blog.csdn.net/u010171031/article/details/51849562

    # encoding: utf-8
    """
    @version: python3.6
    @author: ‘kean‘ 
    @contact: 
    @site: 
    @software: PyCharm
    @file: xixi.py
    @time: 18-1-21 下午4:01
    """
    
    import os
    import sys
    
    
    
    # Path for spark source folder
    # os.environ['SPARK_HOME'] = "/home/kean/app/spark-2.2.1-bin-hadoop2.7"
    # You might need to enter your local IP
    # os.environ['SPARK_LOCAL_IP']="192.168.2.138"
    
    # Path for pyspark and py4j
    # sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python")
    # sys.path.append("/home/kean/app/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip")
    
    try:
        from pyspark import SparkContext
        from pyspark import SparkConf
    
        print("Successfully imported Spark Modules")
    except ImportError as e:
        print("Can not import Spark Modules", e)
        sys.exit(1)
    
    sc = SparkContext('local')
    words = sc.parallelize(["scala", "java", "hadoop", "spark", "akka"])
    print(words.count())
    

    这里并没有使用hadoop,因为我测试streaming从kafka读取数据暂时不涉及到。

    kafka读取下数据

    如果想在pycharm运行从kafka读取数据脚本,那就需要将下面的jar包放在SPARK——HOME的jars下面,或者submit 提交脚本并添加下面的jar包
    spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

    # encoding: utf-8
    """
    @version: python3.6
    @author: ‘kean‘ 
    @contact: 
    @site: 
    @software: PyCharm
    @file: play.py
    @time: 18-1-21 下午4:39
    """
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
    
    # from pysparktest import myFunc
    
    '''
    offsetRanges = []  
    def storeOffsetRanges(rdd):  
        global offsetRanges  
        offsetRanges = rdd.offsetRanges()  
        return rdd  
    
    def printOffsetRanges(rdd):  
        print(rdd.count())  
        for o in offsetRanges:  
            print("__________________________________")
            print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
            print("__________________________________")
    '''
    
    
    def start():
        sc = SparkContext("local[2]", "KafkaConsumerTest")
        ssc = StreamingContext(sc, 2)
    
        zookeeper = "192.168.1.xxx:2181"
        # broker = "192.168.1.xxx:9092"
        topic = "info-news"
        partition = 0
        groupid = "1655"
        start = 0
        topicPartition = TopicAndPartition(topic, partition)
        # fromOffset = {topicPartition: int(start)}
    
        kafkaStream = KafkaUtils.createStream(ssc, zookeeper, groupid, {topic: 1}, {'auto.offset.reset': 'smallest'})
        # kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams = {"metadata.broker.list": broker}, fromOffsets = fromOffset)
        lines = kafkaStream.map(lambda x: x[1])
        lines.pprint()
        # kafkaStream.pprint()
        ssc.start()
        ssc.awaitTermination()
        '''
        directKafkaStream\
        .transform(storeOffsetRanges)\
        .foreachRDD(printOffsetRanges)
        '''
    
    
    if __name__ == '__main__':
        start()
    
    
    image.png

    相关文章

      网友评论

          本文标题:ubuntu pyspark

          本文链接:https://www.haomeiwen.com/subject/zhqpaxtx.html