16、YARN

作者: 逸章 | 来源:发表于2020-07-27 14:54 被阅读0次

    YARN enables multiple applications to run simultaneously on the same shared cluster(多个应用同时运行在同一个共享集群中) and allows applications to negotiate resources based on need. Therefore, resource allocation/management(资源分配和管理) is central to YARN.

    MapReduce编程模型应当和资源管理机制分离开来

    Hadoop 1x只是为支持MapReduce模型,不支持其他新出来的备选模型(Spark、Storm等等)

    图片.png

    Hadoop 1.x的集群利用率问题: In Hadoop 1.x, the cluster resources were divided in terms of fixed size slots for both map and reduce tasks. This means that there could be a scenario where map slots might be full while reduce slots are empty, or vice versa. This was definitely not an optimal utilization of resources, and it needed further optimization.

    YARN is designed to allocate resource containers to the individual applications in a shared(比如MRv1预先分配好的Map slot和Reduce slot之间是不同分享的), secure, and multitenant manner.

    key requirement for YARN when dealing with big data: moving
    computation is cheaper than moving data
    . This helps to minimize network congestion and increase the overall throughput of the system.

    The design decision behind YARN architecture is to separate two major functionalities, resource management and job scheduling or monitoring of JobTracker, into separate daemons, that is, a cluster level ResourceManager (RM) and an application-specific ApplicationMaster (AM).

    一、ApplicationMaster,ResourceManager和NodeManager通信过程

    The ApplicationMaster is at a per-application level. It is responsible for the application's life cycle management and for negotiating the appropriate resources from the Scheduler

    NodeManager acts as a per-machine agent and is responsible for managing the life cycle of the container and for monitoring their resource usage.

    三个组件间的关系:
    1、当向yarn提交了一个任务时,ResourceManager会与对应的NodeManager通信,尝试启动一个ApplicationMaster,在ApplicationMaster成功启动后,ApplicationMaster将向ResouceManager注册自身,注册信息主要包括自己所在节点的host,rpc端口号和用于外部追踪应用执行程度的weburl,ResouceManager接收到该rpc注册函数后,将返回一个RegisterApplicationMasterResponse类型的对象,主要包含可以申请最大的单个Container所占的资源量。
    2、注册成功后,ApplicationMaster会向ResourceManager申请Cotainer(s)资源,ResourceManager接收到该请求后,将会返回一个AllocateResponse对象,该对象包含希望ApplicationMaster执行的命令和分配给应用程序的资源列表,也就是Container列表。
    3、当ApplicationMaster得到该资源列表后,将通过rpc与NodeManager通信以启动Container,Container启动完成后, AppicationMaster会周期性地通过rpc向NodeManager询问Container的运行状态,如果此时Container执行失败,ApplicationMaster会重新和ResourceManager通信申请新的资源。如果Container执行成功后,ApplicationMaster将会通过rpc函数释放Container。
    4、该应用程序对应的所有Container都执行成功后, ApplicationMaster将通过rpc告诉ResoureManager程序执行完毕并退出

    几个术语:
    . Application: 提交给framework的 job,比如一个MapReduce job。它也可以是一个shell脚本
    .Container: This is the basic unit of hardware allocation, for example a container that has 4 GB of RAM and one CPU. The container does optimized resource allocation; this replaces the fixed map and reduce slots in the previous versions of Hadoop.

    image.png

    Application的执行顺序:
    一个client 把application提交给ResourceManager. 上面这个图中, client 1 submits a Shell Script 请求, and client 2 submits a MapReduce 请求:

    1. Then, ResourceManager为每个client application分配一个container 以启动 ApplicationMaster,上例而言: 为shell script启动一个ApplicationMaster,为MapReduce application启动另外一个ApplicationMaster.
    2. 在启动ApplicationMaster时, the ResourceManager需要注册这个 application.
    3. 启动完ApplicationMaster后,会和ResourceManager协商为每个application分配相应的资源.
    4. ResourceManager分配完资源后, the ApplicationMaster会请求NodeManager 启动containers
    5. Containers一旦启动成功, application的代码就会在container中运行, 同时ApplicationManager会向ResourceManager报告这个application运行状态
    6. Application运行期间, the client 可以直接向 ApplicationMaster
      或者ResourceManager发送请求获取the application status, progress updates...
    7. Application运行完毕后(On execution of the application), the ApplicationMaster请求ResourceManager unregisters 并shut downs 它的容器进程。

    二、重要配置

    2.1 NameNode重要配置

    image.png

    hdfs-site.xml中还有如下重要内容:


    image.png

    2.2 DataNode重要配置

    image.png

    2.3 ResourceManager相关的重要配置

    image.png

    ResourceManager负责调度container

    2.4 NodeManager相关的重要配置

    image.png

    NodeManager负责Container的生命周期管理

    三、独立启动各个进程

    图片.png
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ start-dfs.sh
    ...
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ stop-dfs.sh
    
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ hadoop-daemons.sh   --script hdfs start namenode
    
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ hadoop-daemons.sh   --script hdfs start datanode
    
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ yarn-daemon.sh start resourcemanager
    
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ yarn-daemon.sh start nodemanager
    
    yay@yay-ThinkPad-T470-W10DG:~/software/hadoop-3.2.1/sbin$ mr-jobhistory-daemon.sh start historyserver
    

    四、几个web ui 界面地址

    1. NameNode URL

    <property>
            <name>dfs.http.address</name>
            <!--value>localhost:9870</value-->
            <value>localhost:9871</value>
        </property>
    
    图片.png

    2. ResourceManager URL

    图片.png

    3. MapReduce JobHistory服务器地址

    图片.png

    五、例子

    代码:
    pom.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.sinovationeedremove</groupId>
      <artifactId>yarn1</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>yarn1</name>
      <url>http://maven.apache.org</url>
    
      <properties>
       <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       <hadoop.version>3.0.3</hadoop.version>
       <hive.version>0.13.1</hive.version>
       <hbase.version>0.98.6-hadoop2</hbase.version>
    </properties>
    
      <dependencies>
        <!--dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency-->
        
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.0.3</version>
        </dependency>
    
    
      <dependency>  
          <groupId>org.apache.hadoop</groupId>  
          <artifactId>hadoop-client</artifactId>  
          <version>2.5.1</version>  
      </dependency> 
    
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.5.0</version>
      </dependency>
      
       <!-- hive client -->
       <!--dependency>
         <groupId>org.apache.hive</groupId>
         <artifactId>hive-jdbc</artifactId>
         <version>${hive.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hive</groupId>
         <artifactId>hive-exec</artifactId>
         <version>${hive.version}</version>
       </dependency-->
      
       <!-- hbase client -->
       <!--dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-server</artifactId>
         <version>${hbase.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-client</artifactId>
         <version>${hbase.version}</version>
       </dependency-->
        
      </dependencies>
    </project>
    

    Mapper

    import java.io.IOException;
    import java.util.Arrays;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    // org.apache.hadoop.mapreduce Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    public class AnagramMapper extends Mapper<Object, Text, Text, Text>
    {
        private Text sortedText = new Text();
        private Text sourceText = new Text();
        
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString().trim().toLowerCase().replaceAll(",", "");
            System.out.println("LINE:" + line);
            StringTokenizer st = new StringTokenizer(line);
            while(st.hasMoreElements())
            {           
                String word = (String) st.nextElement();
                char[] chars = word.toCharArray();
                Arrays.sort(chars);
                String sortedWord = new String(chars);
                sortedText.set(sortedWord);
                sourceText.set(word);
                System.out.println("\torig:" + sourceText + "\tsorted:" + sortedText);
                context.write(sortedText, sourceText);
            }
        }
    }
    

    Reducer

    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class AnagramReducer extends Reducer<Text, Text, Text, Text>
    {
        private Text outputKey = new Text();
        private Text outputVal = new Text();
    
        public void reduce(Text anagramKey, Iterable<Text> anagramVals, Context context)
                throws IOException, InterruptedException
        {
            String out = "";
            if (anagramKey.toString().length() >= 2) {
                for (Text ana : anagramVals) {
                    out += ana.toString() + ",";
                }
                if (out.length() > 1) {
                    out = out.substring(0, out.length() - 1);
                }
                outputKey.set(anagramKey.toString() + " --> ");
                outputVal.set(out);
                System.out.println("********Writing reducer output: " + anagramKey.toString() + "  --> " + out);
                context.write(outputKey, outputVal);
            }
        }
    }
    
    

    Job:

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class App
    {
        public static void main(String[] args) throws Exception
        {
            if (args.length != 2) {
                System.err.println("Usage: Anagram <input path> <output path>");
                System.exit(-1);
            }
            
            Job job = Job.getInstance();
            job.setJarByClass(App.class);
            job.setJobName("Anagram Job");
            
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.setMapperClass(AnagramMapper.class);
            job.setReducerClass(AnagramReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    

    1. 本地运行

    1.1 Eclipse本地运行

    图片.png
    图片.png

    1.2 命令行本地运行

    图片.png
    yay@yay-ThinkPad-T470-W10DG:~/eclipse-workspace/yarn1$ hadoop jar target/yarn1-0.0.1-SNAPSHOT.jar com.sinovatio.yarn1.App input2 output
    

    1.3 伪集群环境运行

    和上面的区别是把input文件上传上去,同时Hadoop使用伪集群配置,运行用的jar包还是在本地,这里略去

    5、关于Container的配置

    图片.png

    YARN对多租户应用的支持:
    1。比如一个社区有多个公寓,多个家庭住在不同的公寓里面,这些公寓有隐私和安全保障; 同时也存在一些公共区域,比如社区大门,公共花园和游乐场等等。
    2。YRAN中运行的多个应用也有类似特性:他们共享集群的处理能力、存储能力和数据访问安全机制。多租户是通过诸如多个不同queue来实现,安全和隐私通过配置Linux和HDFS的独立的文件和目录来实现租户边界的......

    相关文章

      网友评论

          本文标题:16、YARN

          本文链接:https://www.haomeiwen.com/subject/axpzlktx.html