美文网首页
Spark+Hadoop分布式实验

Spark+Hadoop分布式实验

作者: SMusk | 来源:发表于2022-06-12 13:37 被阅读0次

    配置环境

    1、配置docker

    输入以下命令安装docker

    sudo apt-get update
    sudo apt-get install docker.io
    systemctl start docker
    systemctl enable docker
    

    将当前用户添加到拥有启动docker的用户组

    # 增加一个docker用户组,用于分配对应的全县
    sudo su # 切换到root
    groupadd docker # 添加用户组
    gpasswd -a user docker # 将user添加到docker用户组
    

    然后查看docker用户组的用户

    getent group docker
    

    若出现之前添加的user则代表操作成功

    2、安装hadoop

    切换回之前的用户

    su user
    

    开始安装hadoop,首先搜索镜像

    docker search hadoop
    

    然后我们选择第一个,STARS最高的一条

    Setup-1.png

    拉取镜像

    docker pull sequenceiq/hadoop-docker
    

    执行同样的操作,拉取spark的镜像

    docker pull bitnami/spark
    

    3、配置hadoop

    接下来的命令中第一行的注释代表了在哪里执行代码,server代表主机,container代表容器(就是在hadoop容器里)

    下述命令需要在三个容器中分别运行

    首先进入hadoop1(在生成完公钥后要再次进入hadoop2、hadoop3分别生成)

    # run in server
    docker exec -it hadoop1 bash
    

    生成公钥

    # run in container
    cd /root/.ssh
    rm authorized_keys
    rm id_rsa*
    /etc/init.d/sshd start
    ssh-keygen -t rsa
    

    将公钥保存位authorized_keys

    # run in container
    cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys 
    exit # exit the continer
    

    在主机端合并三个节点的公钥

    获取三个节点的公钥,保存为key1,key2,key3

    # run in server
    for i in $(seq 1 3)
    do
    docker cp hadoop${i}:/root/.ssh/authorized_keys key${i}
    done
    

    将三个公钥组合成authorized_keys并复制到三个节点中

    # run in server
    cat key1 key2 key3 >> authorized_keys
    for i in $(seq 1 3)
    do
    docker cp authorized_keys hadoop${i}:/root/.ssh/authorized_keys
    done
    

    在节点中修改文件权限

    不修改可能会出现文件权限不对,导致节点无法读取authorized_keys

    # run in server
    docker exec -it hadoop1 bash
    # run in container
    cd ~/.ssh
    chown `whoami` authorized_keys
    chgrp `whoami` authorized_keys
    

    编辑相关配置文件

    在每一个hadoop中的 /usr/local/hadoop-2.7.0/etc/hadoop文件夹内修改以下文件

    core-site.xml中添加

    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop/tmp</value>
        <description>Abase for other temporary directories.</description>
    </property>
    

    hdfs-site.xml中添加

    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>Master:50090</value>
        </property>
    <property>
        <name>dfs.namenode.name.dir</name>
         <value>file:/usr/local/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/data</value>
    </property>
    

    mapred-site.xml中添加

    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>Master:10020</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>Master:19888</value>
    </property>
    

    yarn-site.xml中添加

    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>master</value>
    </property>
    

    slaves修改为

    slave1
    slave2
    

    如果是从外部复制的,则需要修改对应权限

    chown -R root /usr/local/hadoop-2.7.0/etc/hadoop/
    chgrp -R root /usr/local/hadoop-2.7.0/etc/hadoop/
    

    启动集群

    首次启动需要在master节点中格式化NameNode

    # run in hadoop1
    /usr/local/hadoop/bin/hdfs namenode -format
    

    然后在master节点中启动集群

    /usr/local/hadoop/sbin/stop-all.sh
    /usr/local/hadoop/sbin/start-all.sh
    

    配置环境

    使用docker compose构建spark集群

    配置文件如下,在一个空文件夹中存放

    docker-compose.yaml

    version: '2'
    
    services:
      master:
        image: bitnami/spark:latest
        hostname: master
        environment:
          - SPARK_MODE=master
          - SPARK_RPC_AUTHENTICATION_ENABLED=no
          - SPARK_RPC_ENCRYPTION_ENABLED=no
          - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
          - SPARK_SSL_ENABLED=no
        volumes:
          - ~/Code/Java/Distribution-Spark/share:/opt/share
        ports:
          - '8080:8080'
          - '4040:4040'
        container_name: spark1
      worker-1:
        image: bitnami/spark:latest
        hostname: worker1
        environment:
          - SPARK_MODE=worker
          - SPARK_MASTER_URL=spark://master:7077
          - SPARK_WORKER_MEMORY=1G
          - SPARK_WORKER_CORES=1
          - SPARK_RPC_AUTHENTICATION_ENABLED=no
          - SPARK_RPC_ENCRYPTION_ENABLED=no
          - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
          - SPARK_SSL_ENABLED=no
        volumes:
          - ~/Code/Java/Distribution-Spark/share:/opt/share
        ports:
          - '8081:8081'
        depends_on:
          - master
        container_name: spark2
      worker-2:
        image: bitnami/spark:latest
        hostname: worker2
        environment:
          - SPARK_MODE=worker
          - SPARK_MASTER_URL=spark://master:7077
          - SPARK_WORKER_MEMORY=1G
          - SPARK_WORKER_CORES=1
          - SPARK_RPC_AUTHENTICATION_ENABLED=no
          - SPARK_RPC_ENCRYPTION_ENABLED=no
          - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
          - SPARK_SSL_ENABLED=no
        volumes:
          - ~/Code/Java/Distribution-Spark/share:/opt/share
        ports:
          - '8082:8081'
        depends_on:
          - master
        container_name: spark3
    

    运行如下命令构建集群

    docker compose up -d
    

    构建集群的同时会构建一个network,名字是“文件夹名-default”,用于节点间通信

    运行

    用pyspark完成实验

    import pyspark
    from pyspark import SparkContext
    
    def Map1(x):
        x = x.split(",")
        return (x[1], x[3], x[4])
    
    def FilterAverage(x):
        return x[1] == "必修"
    
    def Map2(x):
        return (x[0], (float(x[2]), 1))
    
    def AddAverage(x, y):
        return (x[0]+y[0], x[1]+y[1])
    
    def Map3(x):
        return (x[0], x[1][0]/x[1][1])
    
    def Group(x):
        if x[1] < 60:
            return 0
        return min(int(x[1] / 10) - 5, 5)
    
    sc = SparkContext("local", "score2")
    tf = sc.textFile("./grades.txt")
    # filter the compulsory class grades
    tf2 = tf.map(lambda x:Map1(x)).filter(lambda x:FilterAverage(x))
    # generate (name, grade) rdd
    tf3 = tf2.map(lambda x: Map2(x))
    # calculate the average of the grades
    tf4 = tf3.reduceByKey(lambda x, y:AddAverage(x, y)).map(lambda x: Map3(x))
    # save results
    tf4.saveAsTextFile("./result1")
    
    # group by average grade
    tf5 = tf4.groupBy(lambda x: Group(x)).sortByKey()
    resultInterval = ["[0, 60)", "[60, 70)", "[70, 80)", "[80, 90)", "[90, 100]"]
    tf6 = tf5.map(lambda x:(resultInterval[x[0]], len(x[1])))
    tf6.saveAsTextFile("./result2")
    

    程序运行的结果会存放在spark目录下的result1和result2文件夹中

    我们将数据存在grades.txt中,将代码保存为AverageScore.py

    将数据和代码都迁移到spark中(/opt/bitnami/spark为container的默认登录地址)

    docker cp ./AverageScore.py spark1:/opt/bitnami/spark/
    docker cp ./grades.txt spark1:/opt/bitnami/spark/
    

    进入spark的主节点,并运行代码

    # run in server
    docker exec -it spark1 bash
    # run in container
    spark-submit --master spark://master:7077 AverageScore.py
    

    相关文章

      网友评论

          本文标题:Spark+Hadoop分布式实验

          本文链接:https://www.haomeiwen.com/subject/qqvgmrtx.html