美文网首页我爱编程
Hadoop (Version 1.2.1,JDK6)hdfs

Hadoop (Version 1.2.1,JDK6)hdfs

作者: MicoCube | 来源:发表于2018-01-17 22:22 被阅读0次
  • Hadoop的框架最核心的设计就是:HDFS(Hadoop Distributed File System)和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
  • 各版本hadoop文档地址
  • 安装前准备
    • hadoop1.2.1编译包
    • 将编译包上传到其中一台机器
    • 准备5台机器,虚拟机[此处为192.168.10.215~192.168.10.217]
    • 5台机器全部关闭防火墙:
    [root@dn3 ~]# systemctl stop firewalld.service
    [root@dn3 ~]# systemctl disable firewalld.service
    Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
    Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
    [root@dn3 ~]# iptables -F
    [root@dn3 ~]# vi /etc/selinux/config
    #SELINUX=enforcing #注释掉
    #SELINUXTYPE=targeted #注释掉
    #SELINUX=disabled #增加 
    # 千万不能写成SELINUXTYPE=disabled,如果你这么写了,
    # 你可能需要这个:http://www.mamicode.com/info-detail-1847013.html
    [root@dn3 ~]# setenforce 0
    # 重启所有虚拟机
    [root@dn3 ~]# reboot
    [root@dn3 ~]# iptables -F
    
    • 修改主机名[此处使用的是centos7的命令,其他版本的linux请自行百度]hostnamectl set-hostname 主机名,这里将NN节点的主机名设为NN,SNN节点的主机名为snn,dn节点有三个,分别为dn1,dn2,dn3
    • 修改hosts文件
    127.0.0.1       localhost       localhost
    192.168.10.219  nn              nn
    192.168.10.218  snn             snn
    192.168.10.217  dn1             dn1
    192.168.10.216  dn2             dn2
    192.168.10.215  dn3             dn3
    
    • 查看与你的hadoop兼容的jdk版本,hadoop版本为1.2.1,对应的jdk兼容版本为6,并且在官方的wiki上有说明It is built and tested on both OpenJDK and Oracle (HotSpot)'s JDK/JRE.,所以直接在你的5台机器上yum -y install java-1.6.0-openjdk.x86_64,查看jdk的安装路径可以使用:
      [root@localhost hadoop]# rpm -ql java-1.6.0-openjdk.x86_64
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/java
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/keytool
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/orbd
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/pack200
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/policytool
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/rmid
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/rmiregistry
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/servertool
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/tnameserv
      /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin/unpack200
      # 由此可知jdk安装在/usr/lib/jvm下
      [root@localhost hadoop]ls /usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre/bin
      java  keytool  orbd  pack200  policytool  rmid  rmiregistry  servertool  tnameserv  unpack200
      
      • 关于免密登陆:
        • 实际上免密登陆会有俩次请求,并不是真的不需要密码,假设A要免密登陆到B
        • A第一次请求会将自身的ip地址和公钥发送给B(pub key),B收到ip和公钥之后与authorized_keys中的公钥进行比较,若一致则认为该机器可以免密登陆到B,接着B会将自身的密码发送给A
        • A第二次请求将用户名和密码发送给B,B响应登陆成功
        • 也就是说,假如A要登陆到B,B就必须要有A的公钥
      • 选择一台机器可以以免密登陆到其他4台机器,当然自身也需要免密登陆[可选],如果不选的话,在hdfs启动过程中会暂停问你索要密码,才会继续,这里选的是nn节点[可选任意节点,不一定是那么name node],自身免密登陆,同时可以免密登陆到其他四台机器
          # 安装ssh(因为我的系统自带ssh,所以nothing to do)###################################
          [root@localhost yum.repos.d]# yum install -y sshd
          Loaded plugins: fastestmirror
          Loading mirror speeds from cached hostfile
           * base: mirrors.aliyun.com
           * extras: mirrors.tuna.tsinghua.edu.cn
           * updates: mirrors.aliyun.com
          No package ssh available.
          Error: Nothing to do
          # 安装rsync######################################################################
          [root@localhost yum.repos.d]# yum install -y rsync
          Loaded plugins: fastestmirror
          Loading mirror speeds from cached hostfile
           * base: mirrors.aliyun.com
           * extras: mirrors.tuna.tsinghua.edu.cn
           * updates: mirrors.aliyun.com
          Resolving Dependencies
          --> Running transaction check
          ---> Package rsync.x86_64 0:3.0.9-18.el7 will be installed
          --> Finished Dependency Resolution
        
          Dependencies Resolved
        
          =========================================================================================
           Package           Arch               Version                     Repository        Size
          =========================================================================================
          Installing:
           rsync             x86_64             3.0.9-18.el7                base             360 k
        
          Transaction Summary
          =========================================================================================
          Install  1 Package
        
          Total download size: 360 k
          Installed size: 732 k
          Downloading packages:
          rsync-3.0.9-18.el7.x86_64.rpm                                     | 360 kB  00:00:00     
          Running transaction check
          Running transaction test
          Transaction test succeeded
          Running transaction
            Installing : rsync-3.0.9-18.el7.x86_64                                             1/1 
            Verifying  : rsync-3.0.9-18.el7.x86_64                                             1/1 
        
          Installed:
            rsync.x86_64 0:3.0.9-18.el7                                                            
        
          Complete!
          [root@localhost yum.repos.d]# ssh localhost
          The authenticity of host 'localhost (::1)' can't be established.
          ECDSA key fingerprint is SHA256:3h7izAi6QdCeHwDrb8PdeeoMzaJH0zP4n75SQBxlSr8.
          ECDSA key fingerprint is MD5:3a:e3:ca:15:c7:24:cf:56:37:27:31:70:14:70:d5:01.
          Are you sure you want to continue connecting (yes/no)? yes
          Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
          root@localhost's password: 
          #生成秘钥######################################################################
          [root@localhost yum.repos.d]# ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
          Generating public/private rsa key pair.
          Your identification has been saved in /root/.ssh/id_rsa.
          Your public key has been saved in /root/.ssh/id_rsa.pub.
          The key fingerprint is:
          SHA256:0kVvd1pHxg8NSCq84SRl2Yi0Zr48LjA8LNz4pxx3Cms root@localhost.localdomain
          The key's randomart image is:
          +---[RSA 2048]----+
          |     ...o+.....+o|
          |      .=o..o. .oo|
          |      = = o o ..=|
          |     + = = . . +o|
          |.oo   o S     .  |
          |.o*. . o         |
          | ..* .+.         |
          |  .E*oo.         |
          |  .+oo.          |
          +----[SHA256]-----+
          #配置authorized_keys,公钥追加到本地的认证文件中#################################################
          [root@localhost yum.repos.d]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
          # 授权######################################################################
          [root@localhost yum.repos.d]# chmod 0600 ~/.ssh/authorized_keys
          # 自身免密登陆测试######################################################################
          [root@localhost yum.repos.d]# ssh localhost
          Last login: Wed Dec 20 22:36:50 2017 from 192.168.10.211
          # 退出######################################################################
          [root@localhost ~]# exit
          logout
          Connection to localhost closed.
          [root@localhost yum.repos.d]# 
        
    • 设置其他三台机器可以使用219免密登陆[将219的公钥拷贝给其他机器,并加入到认证文件,同时自身也可以免登陆,如果不设置的话会有警告]
      • 示例
      [root@snn ~]# scp nn:/root/.ssh/id_rsa.pub /root/219_id_rsa.pub
      The authenticity of host 'nn (192.168.10.219)' can't be established.
      ECDSA key fingerprint is SHA256:3h7izAi6QdCeHwDrb8PdeeoMzaJH0zP4n75SQBxlSr8.
      ECDSA key fingerprint is MD5:3a:e3:ca:15:c7:24:cf:56:37:27:31:70:14:70:d5:01.
      Are you sure you want to continue connecting (yes/no)? yes
      Warning: Permanently added 'nn,192.168.10.219' (ECDSA) to the list of known hosts.
      root@nn's password: 
      id_rsa.pub                                                                      100%  389    80.7KB/s   00:00
      [root@dn1 ~]# cat /root/219_id_rsa.pub >> ~/.ssh/authorized_keys    
      
  • 配置hadoop根目录/conf下的core-site.xml文件
<configuration>
    <!-- hdfs访问接口地址,即name node的访问地址 -->
     <property>
         <name>fs.default.name</name>
         <value>hdfs://nn:9000</value>
     </property>
    <!-- hadoop工作基础存放地点,包括namenode的工作目录 -->
     <property>
         <name>hadoop.tmp.dir</name>
         <value>/opt/hadoop</value>
     </property>
</configuration>
  • 配置hadoop根目录下的hdfs-site.xml
<configuration>
     <property>
        <!-- hdfs 的副本数 小于或等于datanode的节点数量 -->
         <name>dfs.replication</name>
         <value>3</value>
     </property>
</configuration>
  • 配置hadoop根目录下的slaves
# 配置数据节点,ip或者主机名称
dn1
dn2
dn3
  • 配置hadoop根目录下的masters
# 配置second name node节点,ip或者主机名称
snn
  • 那么,name node主机是在哪配呢?看core-site.xml,配置了nn节点的访问地址
  • 添加HADOOP_HOME到/etc/profile
    # 填你的hadoop安装路径
    export HADOOP_HOME=/root/hadoop-1.2.1
    export PATH=$HADOOP_HOME/bin:$PATH
    
  • source /etc/profile
  • hadoop namenode -format
  • start-dfs.sh
  • python安装脚本
  • 服务器配置文件
ip,user,pwd,hostname,nodes
192.168.10.215,root,mico,dn1,DN
192.168.10.216,root,mico,dn2,DN
192.168.10.217,root,mico,dn3,DN
192.168.10.218,root,mico,snn,SNN
192.168.10.219,root,mico,nn,NN
  • 软件配置文件
software
java-1.6.0-openjdk.x86_64
rsync
  • 版本和路径配置文件
;软件版本说明
[version]
;hadoop版本
hadoop=1.2.1
;openjdk的版本
openjdk=1.6.0

;服务器路径相关
[path]
hadoop=/opt/
  • python 脚本
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 关闭防火墙systemctl stop firewalld.service
#禁止防火墙开机自启systemctl disable firewalld.service
#禁用selinux vi /etc/selinux/config
# selinux 生效setenforce 0
# 设定主机名
# 修改host文件
# 重启 reboot
# 安装jdk
# 免密登陆
# yum install -y ssh
# yum install -y rsync
# 生成公钥ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
# 追加到本地文件中
# 免密登陆测试

# 解压hadoop压缩包
# 配置hadoop-env.sh 配置JAVA_HOME
# 配置core-site.xml:
#     <configuration>
#         <!-- hdfs访问接口地址,即name node的访问地址 -->
#          <property>
#              <name>fs.default.name</name>
#              <value>hdfs://nn:9000</value>
#          </property>
#         <!-- hadoop工作基础存放地点,包括namenode的工作目录 -->
#          <property>
#              <name>hadoop.tmp.dir</name>
#              <value>/opt/hadoop</value>
#          </property>
#     </configuration>
# 配置hdfs-site.xml
#     <configuration>
#          <property>
#             <!-- hdfs 的副本数 小于或等于datanode的节点数量 -->
#              <name>dfs.replication</name>
#              <value>3</value>
#          </property>
#     </configuration>
# 配置数据节点slaves,ip或者主机名称
#     dn1
#     dn2
#     dn3
# 配置second name node节点masters,ip或者主机名称
#     snn
# 将hadoop加入到系统环境变量
# hadoop namenode -format
# start-dfs.sh

import csv
import paramiko
import os
from www.colorprint.color_print import ColorPrint
import configparser
import urllib.request
from tqdm import tqdm
import tarfile
from lxml import etree


# 第一行插入文字
def insert_first_line(file,text):
    with open(file, 'r+',encoding="utf8") as f:
        content = f.read()
        f.seek(0, 0)
        f.write(text + content)


# 逐行写文件
def write_lines(file, lines):
    with open(file, 'w', encoding='utf8') as f:
        for line in lines:
            if line == lines[-1]:
                f.write(line)
            else:
                f.write(line+"\n")


# 下载进度条
def process_hook(t):
    last_b = [0]

    def inner(b=1, bsize=1, tsize=None):
        """
        b  : int, optional
            Number of blocks just transferred [default: 1].
        bsize  : int, optional
            Size of each block (in tqdm units) [default: 1].
        tsize  : int, optional
            Total size (in tqdm units). If [default: None] remains unchanged.
        """
        if tsize is not None:
            t.total = tsize
        t.update((b - last_b[0]) * bsize)
        last_b[0] = b

    return inner


conf = configparser.ConfigParser()


# 获取ini配置文件值
def read_ini(file, section, _name_):
    conf.read(file,encoding="utf8")
    return conf.get(section, _name_)


# 读取csv文件
def read(file):
    config_ = []
    with open(file, encoding='utf8') as f:
        f_csv = csv.reader(f)
        next(f_csv)
        for row in f_csv:
            config_.append(row)
    return config_

servers = read("./server.csv")
data_nodes = []
name_node = []
second_name_node = []
hadoop_home = ""
hadoop_name = ""
hadoop_file_name = ""


# 返回一个字符串,单个命令,返回该命令的所有行
def ssh(ip, user_name, pass_wd, cmd):
    result_str = ""
    err_str = ""
    try:
        ssh_client = paramiko.SSHClient()
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh_client.connect(ip, 22, user_name, pass_wd, timeout=5)
        # print ("执行远程命令:服务器ip:%s,命令:%s" %(ip,cmd))
        std_in, stdout, stderr = ssh_client.exec_command(cmd)
        #           stdin.write("Y")   #简单交互,输入 ‘Y’
        out = stdout.readlines()
        err = stderr.readline()
        # 屏幕输出
        for o in out:
            result_str += o
            print(o)
        for e in err:
            err_str += e
        if(len(err_str) != 0):
            print(err_str)
        # print('%s\t执行完毕\n' % ip)
        # print("result_str:"+result_str)
        ssh_client.close()
    except Exception as e:
        print('%s\tError:%s\n' % (ip, e))

    return result_str


def upload(host_ip, username, password, local_path, remote_path):
    t = paramiko.Transport((host_ip, 22))
    t.connect(username=username, password=password)  # 登录远程服务器
    sftp = paramiko.SFTPClient.from_transport(t)  # sftp传输协议
    sftp.put(local_path, remote_path)
    t.close()


def download(host_ip, username, password, remote_path, local_path):
    t = paramiko.Transport((host_ip, 22))
    t.connect(username=username, password=password)  # 登录远程服务器
    sftp = paramiko.SFTPClient.from_transport(t)  # sftp传输协议
    src = remote_path
    des = local_path
    sftp.get(src, des)
    t.close()


# 关闭防火墙
def stop_firewall():
    for l in servers:
        rs = ssh(l[0], l[1], l[2], "systemctl stop firewalld.service")
        print(rs)


#禁止防火墙开机自启systemctl disable firewalld.service
def disable_firewall():
    for l in servers:
        rs = ssh(l[0], l[1], l[2], "systemctl disable firewalld.service")
        print(rs)


# 禁用selinux vi /etc/selinux/config sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
def edit_selinux_config():
    for l in servers:
        rs = ssh(l[0], l[1], l[2], "sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config")
        print(rs)


# selinux 生效setenforce 0
def selinux_config_effective():
    for l in servers:
        rs = ssh(l[0], l[1], l[2], "setenforce 0")
        print(rs)


# 设定主机名
def set_host_name():
    for l in servers:
        rs = ssh(l[0], l[1], l[2], "hostnamectl set-hostname "+ l[3])
        print(rs)


# 修改host文件
def edit_host_file():
    for l in servers:
        cmd = "echo \'"+l[0]+"  "+l[3]+"    "+l[3]+"'>>/etc/hosts"
        rs = ssh(l[0], l[1], l[2], cmd)
        print(rs)


# 重启 reboot
def reboot():
    for l in servers:
        cmd = "reboot"
        ssh(l[0], l[1], l[2], cmd)


# 等待启动
def wait_start_up():
    while(True):
        all_str = ""
        try:
            for l in servers:
                rs = ssh(l[0], l[1], l[2], "ls /etc/hosts")
                if rs == '':
                    rs = 'ERROR'
                all_str += rs
        except Exception as e:
            continue
        else:
            if all_str.find("ERROR")!=-1:
                continue
            else:
                break


def execute_cmd(cmd):
    for l in servers:
        ssh(l[0], l[1], l[2], cmd)


# 安装jdk yum -y install java-1.6.0-openjdk.x86_64
# yum install -y ssh
# yum install -y rsync
def install_software():
    yum_y_install_s = "yum -y install " + "rsync"
    execute_cmd(yum_y_install_s)
    jdk_version = read_ini("./config.ini", "version", "openjdk")
    yum_y_install_s = "yum -y install " + "yum -y install java-"+jdk_version+"-openjdk.x86_64"
    execute_cmd(yum_y_install_s)


# 免密登陆,nn节点能免密登陆到其他机器
def silent_login():
    ColorPrint.print_info("数据节点:"+str(data_nodes))
    ColorPrint.print_info("NameNode:"+str(name_node))
    ColorPrint.print_info("SecondNameNode:"+str(second_name_node))
    gen_pubkey()
    for l in servers:
        if name_node[0] == l[0]:
            continue
        else:
            append_auth_keys(name_node,l)


# 复制source 的公钥,传到其des服务器,使source 可以免密登陆到des服务器
def append_auth_keys(source,des):
    # 复制source 的公钥,传到其des服务器,使source 可以免密登陆到des服务器
    ColorPrint.print_info(str(source[0]) + "公钥复制到:" + str(des[0]))
    download(source[0], source[1], source[2], "/" + source[1] + "/.ssh/id_rsa.pub",source[0] + "id_rsa.pub")
    local_pub = source[0] + "id_rsa.pub"
    up_pub = "/" + source[1] + "/.ssh/" + source[0] + "id_rsa.pub"
    upload(des[0], des[1], des[2], local_pub, up_pub)
    append_keys = "cat /" + source[1] + "/.ssh/" + source[0] + "id_rsa.pub >> ~/.ssh/authorized_keys"
    ssh(des[0], des[1], des[2], append_keys)
    ColorPrint.print_info(str(source[0]) + "公钥复制到:" + str(des[0]) + "成功!")
    os.remove(local_pub)
    remove_des_keys = "rm -f " + up_pub
    ssh(des[0], des[1], des[2], remove_des_keys)


# 所有节点生成公钥
def gen_pubkey():
    ColorPrint.print_info("所有节点生成公钥....")
    for l in servers:
        rm_rsa = "rm -f /" + l[1] + "/.ssh/id_rsa*"
        ssh(l[0], l[1], l[2], rm_rsa)
    for l in servers:
        gen_rsa = "ssh-keygen -t rsa -P '' -f /" + l[1] + "/.ssh/id_rsa"
        ssh(l[0], l[1], l[2], gen_rsa)
    ColorPrint.print_info("所有节点生成公钥完成....")
    ColorPrint.print_info("所有节点自身公钥添加到信任文件....")
    for l in servers:
        append_auth = "cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys"
        ssh(l[0], l[1], l[2], append_auth)
    ColorPrint.print_info("所有节点自身公钥添加到信任文件完成....")


def analysis_nodes():
    global name_node
    global second_name_node
    for l in servers:
        if 'NN' == l[4]:
            name_node = l
        elif 'SNN' == l[4]:
            second_name_node = l
        else:
            data_nodes.append(l)


def find_jdk(dest):
    find = "find / -name 'java' -type f -perm -111 |awk -F'bin' '{print $1}'"
    rs = ssh(dest[0],dest[1],dest[2],find)
    return rs


def fetch_hadoop():
    global hadoop_name
    global hadoop_file_name
    base_url = "http://mirrors.hust.edu.cn/apache/hadoop/common/"
    hadoop_version = read_ini("./config.ini", "version", "hadoop")
    base_name = "hadoop-"+hadoop_version
    hadoop_name = base_name
    base_name_dir = base_name + "/"
    base_url += base_name_dir
    file_name = base_name + ".tar.gz"
    hadoop_file_name = file_name
    base_url += file_name

    with tqdm(unit='B', unit_scale=True, leave=True, miniters=1,desc=file_name) as t:
        urllib.request.urlretrieve(base_url, filename=file_name, reporthook=process_hook(t), data=None)


# 解压hadoop压缩包
def un_tar_hadoop(file_name, base_dir="./"):
    with tarfile.open(file_name) as tar:
        tar.extractall(path=base_dir)
    # tar.extractall(path="./"+base_dir)
    os.remove(file_name)


# 编辑hadoop配置文件hadoop-env.sh
# export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.41.x86_64/jre
def edit_hadoop_env(file_name,java_home):
    insert_first_line(file_name,"export JAVA_HOME="+java_home)


# 配置core-site.xml:
#     <configuration>
#         <!-- hdfs访问接口地址,即name node的访问地址 -->
#          <property>
#              <name>fs.default.name</name>
#              <value>hdfs://nn:9000</value>
#          </property>
#         <!-- hadoop工作基础存放地点,包括namenode的工作目录 -->
#          <property>
#              <name>hadoop.tmp.dir</name>
#              <value>/opt/hadoop</value>
#          </property>
#     </configuration>
def edit_core_site(file_path):
    tree = xml_parser(file_path)
    root = tree.getroot()
    name_property_node = gen_xml_node(root,'property')
    dir_property_node = gen_xml_node(root,'property')

    #         <!-- hdfs访问接口地址,即name node的访问地址 -->
    #          <property>
    #              <name>fs.default.name</name>
    #              <value>hdfs://nn:9000</value>
    #          </property>
    gen_xml_node(name_property_node,'name','fs.default.name')
    gen_xml_node(name_property_node,'value','hdfs://nn:9000')
    #         <!-- hadoop工作基础存放地点,包括namenode的工作目录 -->
    #          <property>
    #              <name>hadoop.tmp.dir</name>
    #              <value>/opt/hadoop</value>
    #          </property>
    gen_xml_node(dir_property_node,'name','hadoop.tmp.dir')
    gen_xml_node(dir_property_node,'value','/opt/hadoop')
    tree.write(file_path,pretty_print=True,xml_declaration = True, encoding='utf8')


def xml_parser(file_path):
    parser = etree.XMLParser(encoding="utf8", remove_blank_text=True)
    tree = etree.parse(file_path,parser)
    return tree


def gen_xml_node(parent_node, name, value=''):
    node = etree.Element(name)
    node.text = value
    parent_node.append(node)
    return node


# 配置hdfs-site.xml
#     <configuration>
#          <property>
#             <!-- hdfs 的副本数 小于或等于datanode的节点数量 -->
#              <name>dfs.replication</name>
#              <value>3</value>
#          </property>
#     </configuration>\
def edit_hdfs_site(file_path):
    tree = xml_parser(file_path)
    root = tree.getroot()
    replication_node = gen_xml_node(root, 'property')
    #          <property>
    #             <!-- hdfs 的副本数 小于或等于datanode的节点数量 -->
    #              <name>dfs.replication</name>
    #              <value>3</value>
    #          </property>
    gen_xml_node(replication_node, 'name', 'dfs.replication')
    gen_xml_node(replication_node, 'value', '3')
    tree.write(file_path, pretty_print=True, xml_declaration=True, encoding='utf8')


# 配置数据节点slaves,ip或者主机名称
#     dn1
#     dn2
#     dn3
def edit_slaves(file_path, data_nodes):
    slaves = []
    for d in data_nodes:
        slaves.append(str(d[3]))
    write_lines(file_path,slaves)


# 配置second name node节点masters,ip或者主机名称
#     snn
def edit_masters(file_path, second_name_node):
    masters = [str(second_name_node[3])]
    write_lines(file_path,masters)


# 压缩编辑后的hadoop
def tar_hadoop(file_path,tar_name):
    tar_file(file_path,tar_name)


# 统计文件数量
def file_count(folder):
    file_counter = 0
    for dir_path, dirs, files in (os.walk(folder)):
        file_counter += len(files)
    return file_counter


# 压缩文件
def tar_file(input_path, tar_file_name):
    tar = tarfile.open(tar_file_name, "w:gz")
    count = file_count(input_path)
    with tqdm(total=count, unit='file',desc="压缩文件到"+tar_file_name) as pbar:
        for dir_path, dirs, files in os.walk(input_path):
            for filename in files:
                full_path = os.path.join(dir_path, filename)
                tar.add(full_path)
                pbar.update(1)
    tar.close()


########################################################################未测试
# 上传压缩后的hadoop到服务器
def upload_hadoop(local_path):
    global hadoop_home
    hadoop_prefix = read_ini("./config.ini", "path", "hadoop")
    sep = local_path.rfind("/")
    if sep != -1:
        file_name = local_path[sep+1:]
        hadoop_home = hadoop_prefix + file_name
    else:
        hadoop_home = hadoop_prefix + local_path
    for l in servers:
        upload(l[0], l[1], l[2], local_path, hadoop_home)


# 解压缩服务器上的hadoop
def extract_server_hadoop(hadoop_tar_path):
    hadoop_prefix = read_ini("./config.ini", "path", "hadoop")
    for l in servers:
        cmd = "tar -C " + hadoop_prefix + " -xvf " + hadoop_tar_path
        ssh(l[0], l[1], l[2], cmd)


# 将hadoop加入到系统环境变量 sed -i '$a\hadoop_home' /etc/profile
def add_hadoop_home(hadoop_home):
    hadoop = hadoop_home.replace(".tar.gz","")
    hadoop_ = "export HADOOP_HOME=" + hadoop
    path = "export PATH=$HADOOP_HOME/bin:$PATH"
    execute_cmd("sed -i '$a\\" + hadoop_ + "' /etc/profile")
    execute_cmd("sed -i '$a\\" + path + "' /etc/profile")
    execute_cmd("source /etc/profile")
    bin_ = "chmod -R +x " + hadoop + "/bin/"
    execute_cmd(bin_)


# 初始化hdfs文件系统 hadoop namenode -format
def format_hdfs(name_nodes):
    ssh(name_nodes[0], name_nodes[1], name_nodes[2], "hadoop namenode -format")


# 启动hdfs start-dfs.sh
def start_hdfs(name_nodes):
    ssh(name_nodes[0], name_nodes[1], name_nodes[2], "start-dfs.sh")

if __name__ == "__main__":
    # # 关闭防火墙
    # stop_firewall()
    # # 禁用防火墙
    # disable_firewall()
    # # 配置禁用selinux
    # edit_selinux_config()
    # # selinux设置生效
    # selinux_config_effective()
    # # 编辑host文件,
    # edit_host_file();
    # # 重启
    # reboot()
    # # 等待重启
    # wait_start_up()
    # # 安装软件
    # install_software()
    # # NameNode免密登陆到其他其他
    # silent_login()
    # # 分析各个节点属性
    analysis_nodes()
    ColorPrint.print_info("数据节点:" + str(data_nodes))
    ColorPrint.print_info("NameNode:" + str(name_node))
    ColorPrint.print_info("SecondNameNode:" + str(second_name_node))
    # 查找jdk版本,各个节点都是装的open_jdk,路径都一样
    jdk_home = find_jdk(name_node)
    print("jdk",jdk_home)
    # # 下载hadoop
    fetch_hadoop()
    # # 解压hadoop
    un_tar_hadoop(hadoop_file_name)
    # edit_hadoop_env("./testData/hadoop-env.sh",jdk_home)
    # edit_core_site("./testData/core-site.xml")
    # edit_hdfs_site("./testData/hdfs-site.xml")
    # edit_slaves("./testData/slaves",data_nodes)
    # edit_masters("./testData/masters",second_name_node)
    tar_hadoop(hadoop_name + "/",hadoop_file_name)
    upload_hadoop(hadoop_file_name)
    extract_server_hadoop(hadoop_home)
    add_hadoop_home(hadoop_home)
    format_hdfs(name_node)
    start_hdfs(name_node)

相关文章

网友评论

    本文标题:Hadoop (Version 1.2.1,JDK6)hdfs

    本文链接:https://www.haomeiwen.com/subject/fjntgxtx.html