美文网首页
第二章 Yarn基础库

第二章 Yarn基础库

作者: super_wing | 来源:发表于2019-12-11 11:01 被阅读0次

    简介

    与MRv1相比Yarn的设计要复杂得多。YARN借用了MRv1的一些底层基础库(如RPC库),又引入了很多新的设计方式,它的基础库更多,如:google的开源序列化框架Protocol Buffers、Apache Avro、自定义的服务库、事件库和状态机等。

    Yarn基础库是其它Yarn模块的基础,它的设计直接决定了YARN的稳定性和扩展性,概括起来,YARN的基础库主要有以下几个:

    • Protocol Buffers
    • Apache Arvo
    • RPC
    • 服务库和时间库
    • 状态机

    接下来我们将一一介绍。

    Protocol Buffers

    简介

    是Google的开源的序列化库,常有做数据存储,消息通信具有以下优点:

    • 跨平台
    • 高新能,体积小
    • 使用简单
    • 兼容性好

    Protocol Buffers示例

    PersonProtos.proto

    option java_package = "com.thougtworks.demo";
    option java_outer_classname = "PersonProtos";
    
    message Person {
    
        required string name = 1;
        required int32 id = 2;
        optional string email = 3;
        message PhoneNumber {
            required string number = 1;
            required int32 type = 2;
        }
    
        repeated PhoneNumber phone = 4;
    
    }
    

    使用如下命令进行编译

    protoc --java_out=. PersonProtos.proto
    

    最终生成如下代码:


    Apache Arvo

    概念

    它的RPC框架,具有平台无关,支持动态模式(无需编译)等优点。
    目前只用于yarn日志序列化库中,以及MR所有事件采用Arvo做序列化和反序列化。

    示例

    user.avro

    {
        "namespace": "com.thoughtworks.demo.avro",
        "type": "record",
        "name": "User",
        "fields": [
            {
                "name": "name",
                "type": "string"
            },
            {
                "name": "favorite_number",
                "type": [
                    "int",
                    "null"
                ]
            },
            {
                "name": "favorite_color",
                "type": [
                    "string",
                    "null"
                ]
            }
        ]
    }
    

    使用如下命令进行编译

    java -jar avro-tools-1.8.2.jar compile schema user.avro
    

    Hadoop RPC

    RPC简介

    RPC (Remote Procedure Call) 通过它解决的两个问题:

    • 解决分布式系统中,服务之间的调用问题。
    • 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。

    如何实现远程过程调用?



    如上图, 当我们使用RPC完成一个远程加法调用是如何处理的的呢?

    1. Client端需要调用了远程的一个Calculator的一个实现类的add方法,这种通过远程调用Server端的RPC接口,来获取运算结果,因此称之为Stub。Stub可以认为它是一个代理;

    2. Stub怎么和Server端建立远程通讯呢?这时候就要用到远程通讯类库,也就是图中的Run-time Library,它的而实现有Java的Socket、Apache Arvo、Apache swift等;

    3. Stub通过调用通讯工具提供的方法,和Server建立起了通讯,然后将请求数据发给Server。需要注意的是,由于底层的网络通讯是基于二进制格式的,因此这里Stub传给Run-time Library的数据也必须是二进制,比如calculator.add(1,2),你必须把参数值1和2放到一个Request对象里头(这个Request对象当然不只这些信息,还包括要调用哪个服务的哪个RPC接口等其他信息),然后序列化为二进制,再传给Run-time Library;

    4. 二进制的数据传到Server端,Server端当然也有自己的通讯类库,通过它接收二进制的请求。既然数据是二进制的,那么自然要进行反序列化了,将二进制的数据反序列化为请求对象,然后将这个请求对象交给Server端的Stub处理;

    5. 和之前的Client端的Stub一样,这里的Stub也同样是个“假玩意”,它所负责的,只是去解析请求对象,知道调用方要调的是哪个RPC接口,传进来的参数又是什么,然后再把这些参数传给对应的RPC接口,也就是Calculator的实际实现类去执行;

    6. RPC接口执行完毕,返回执行结果,现在轮到Server端要把数据发给Client了。这里是一样的道理,一样的流程,只是相互的位置进行交换。

    Hadoop RPC

    Hadoop RPC主要对外提供了两个方法:

    1. public static <T> ProtocolProxy<T> getProtocolProxy得到RPC的客户端。
    2. public Builder(Configuration conf) 构造服务端对象。

    Hadoop RPC主要由RPC、Client、Server三大类组成

    1,RPC类分析

    RPC类是对client-server网络模型的封装,为程序提供一套更方便更简洁的编程接口。

    如上图,getProxy和waitForProxy是构建PRC的客户端, stop为销毁方法,set/getProtocolEngine是用来设置或者获得序列化的实现。

    2,Client类分析

    Client主要完成发PRC信息并接收结果。

    如上图, Client内部有两个非常重要的内部类,Call和Connection。

    • Call类,包含5个成员变量,分别是:
      id:唯一标示,标示调用哪个远程的接口
      retry:重试次数
      rpcRequest:序列化后请求信息
      rpcResponse:序列化后返回值信息
      error: 错误信息
      done:是否执行完成的标示
      rpcKind:RPC序列化的类型,目前支持WritableRpcEngine ProtobufRpcEngine

    • Connection类,Client与每个Server之间的一个连接
      包含如下几个主要的变量和方法
      remoteId:client与server连接的唯一标示
      server:server的endpoint
      socket:与Server通信的socket
      in:输入流
      out:输入流
      calls:保存RPC请求的hash table
      addCall:将一个请求添加到hash table中
      sendParam:发送RPC请求
      receiveRespone:接受服务端的RPC请求
      run:调用receiveRespone,会一直等待接受PRC返回的结果

    Client处理流程

    如上图示:

    • 1,创建一个Connection对象,并将远程方法调用信息封装成Call,放到HashTable中。
    • 2,调用sendRpcRequest方法将当前Call对象发送给Server。
    • 3,Server处理预案RPC的请求后,将结果通过网络返回给Client,Client端通过receivePRCRespone函数返回结果。
    • 4,Client检查结果(成功or失败),并将对应Call对象冲HashTable中删除。
    Server类分析
    Reactor模式

    Reactor模式中文叫做反应器模式,是用来解决服务器端高性能并发的问题。Netty、Redis都使其解决高并发的问题。

    示例代码:

    package com.wing.test;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocket;
    
        Reactor(int port) throws IOException { //Reactor初始化
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverSocket.configureBlocking(false);
    
            //分步处理,第一步,接收accept事件
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            sk.attach(new Acceptor());
        }
    
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }
    
        void dispatch(SelectionKey k) {
            Runnable r = (Runnable) (k.attachment());
            //调用之前注册的callback对象
            if (r != null) {
                r.run();
            }
        }
    
        // inner class
        class Acceptor implements Runnable {
            public void run() {
                try {
                    SocketChannel channel = serverSocket.accept();
                    if (channel != null)
                        new MthreadHandler(selector, channel);
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
    
            }
        }
    }
    
    package com.wing.test;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    class MthreadHandler implements Runnable
    {
        final SocketChannel channel;
        final SelectionKey selectionKey;
        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(3072);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
    
        ExecutorService pool = Executors.newFixedThreadPool(2);
        static final int PROCESSING = 3;
    
        MthreadHandler(Selector selector, SocketChannel c) throws IOException
        {
            channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            selectionKey = channel.register(selector, 0);
    
            //将Handler作为callback对象
            selectionKey.attach(this);
    
            //第二步,注册Read就绪事件
            selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        boolean inputIsComplete()
        {
           /* ... */
            return false;
        }
    
        boolean outputIsComplete()
        {
    
           /* ... */
            return false;
        }
    
        void process()
        {
           /* ... */
            return;
        }
    
        public void run()
        {
            try
            {
                if (state == READING)
                {
                    read();
                }
                else if (state == SENDING)
                {
                    send();
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    
    
        synchronized void read() throws IOException
        {
            // ...
            channel.read(input);
            if (inputIsComplete())
            {
                state = PROCESSING;
                //使用线程pool异步执行
                pool.execute(new Processer());
            }
        }
    
        void send() throws IOException
        {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete())
            {
                selectionKey.cancel();
            }
        }
    
        synchronized void processAndHandOff()
        {
            process();
            state = SENDING;
            // or rebind attachment
            //process完,开始等待write就绪
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    
        class Processer implements Runnable
        {
            public void run()
            {
                processAndHandOff();
            }
        }
    
    }
    
    Yarn Server中Reactor模式
    • Reactor: I/O事件的派发者。
    • Acceptor:接受Client的连接,并指派给一个Handler。
    • Handler 与一个Client的通信实体,具体实现业务处理。
    • Reader/Sender: 为加速处理,Reactor模式构建一个存放数据的处理线程的线程池。这样数据读出后或者写入前,立即放到线程池中执行。

    Server端的处理流程是什么样的呢?如下图

    屏幕快照 2019-12-05 15.52.39.png
    1. 接收请求
      接收来自各个客户端的RPC请求,装成Call类,并放到callQueue中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。
      整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至Listener决定,当前Listener只是采用了简单的轮询分配机制。
      Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。

    2. 处理请求
      该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。
      Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务

    3. 返回结果
      前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
      Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。

    4,在Yarn中如何自定义的一个RPC服务
    1. 定义一个PRC协议
    interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
    //版本号,默认情况下,不同版本号的RPC Client和Server之间不能相互通信
        public static final long versionID = 1L;
        String echo(String value) throws IOException;
        int add(int v1, int v2) throws IOException;
    }
    
    1. 实现一个PRC协议
    public static class ClientProtocolImpl implements ClientProtocol {
        //重载的方法,用于获取自定义的协议版本号,
        public long getProtocolVersion(String protocol, long clientVersion) {
          return ClientProtocol.versionID;
        }
        //重载的方法,用于获取协议签名
        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
        inthashcode) {
          return new ProtocolSignature(ClientProtocol.versionID, null);
        }
        public String echo(String value) throws IOException {
          return value;
        }
        public int add(int v1, int v2) throws IOException {
          return v1 + v2;
        }
    }
    
    
    1. 构造并启动RPC Server
    Server server = new RPC.Builder(conf)
       .setProtocol(ClientProtocol.class)
       .setInstance(new ClientProtocolImpl())
       .setBindAddress(ADDRESS)
       .setPort(8080)
       .setNumHandlers(5).build();
    server.start();
    
    1. 构造RPC Client并发送RPC请求
    proxy = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf);
    int result = proxy.add(5, 6);
    String echoResult = proxy.echo("result");
    

    服务库和事件库

    1,服务库

    YARN对于生命周期较长的对象,采用了基于服务的对象管理模型对其进行管理,该模型主要有以下几个特点:

    • 每个服务对象分为4个状态:
      NOTINITED(被创建)
      INITED(已初始化)
      STARTED(已启动)
      STOPPED(已停止)
    • 任何服务状态变化都可以触发Action。
    • 可通过组合的方式对任意服务进行组合,以便进行统一管理。

    如上图,所有的服务对象最终均实现了接口Service,它定义了最基本的服务初始化、启动、停止等操作,而AbstractService类提供了一个最基本的Service实现。YARN中所有对象,如果是非组合服务,直接继承AbstractService类即可,否则需继承CompositeService。比如ResourceManager。

    事件库

    YARN采用了基于事件驱动的并发模型,该模型能够大大增强并发性,从而提高系统整体性能。为了构建该模型,YARN将各种处理逻辑抽象成事件和对应事件调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。

    如上图,具体过程:处理请求会作为事件进入系统,由中央异步调度器(AsyncDispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给另外一个事件调度器,也可能交给一个带有有限状态机的事件处理器,其处理结果也以事件的形式输出给中央异步调度器。而新的事件会再次被中央异步调度器转发给下一个事件调度器,直至处理完成(达到终止条件)。

    在YARN中,所有核心服务实际上都是一个中央异步调度器,包括ResourceManager、NodeManager、MRAppMaster等,它们维护了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。

    当使用YARN事件库时,通常先要定义一个中央异步调度器AsyncDispatcher,负责事件的处理与转发,然后根据实际业务需求定义一系列事件Event与事件处理器EventHandler,并注册到中央异步调度器中以实现事件统一管理和调度。

    AsyncDispatcher UML图

    状态机

    状态机模式

    状态机:由一组状态组成,这些状态分为三类:初始状态、中间状态和最终状态。状态机从初始状态开始运行,经过一系列中间状态后,到达最终状态并退出。在一个状态机中,每个状态都可以接收一组特定事件,并根据具体的事件类型转换到另一个状态。当状态机转换到最终状态时,则退出。

    状态机解决的问题:解决复杂状态下,状态流转过程中的可扩展性,可阅读性。

    如上图是示:

    • Context:环境类
    • State:抽象状态类
    • ConcreteState:具体状态类

    示例:

    public class StateMachineTest {
        public static void main(String[] args) {
            Context context = new ContextImpl();
            context.state(ColorState.WHITE);
            while (context.state().process(context)) ;
        }
    }
    
    public interface Context {
        State state();
        void state(State state);
    }
    
    public class ContextImpl implements Context {
    
        State state;
    
        @Override
        public State state() {
            return state;
        }
    
        @Override
        public void state(State state) {
            this.state = state;
        }
    
    }
    
    interface State {
    
        boolean process(Context context);
    }
    
    
    public enum ColorState implements State {
    
        RED {
            public boolean process(Context context) {
                context.state(ColorState.GREEN);
                System.out.println("Current State = " + this);
                return true;
            }
        },
    
        GREEN {
            public boolean process(Context context) {
                context.state(ColorState.BLACK);
                System.out.println("Current State = " + this);
                return true;
            }
        },
    
        BLACK {
            public boolean process(Context context) {
                context.state(ColorState.YELLOW);
                System.out.println("Current State = " + this);
                return true;
            }
        },
    
        YELLOW {
            public boolean process(Context context) {
                context.state(ColorState.WHITE);
                System.out.println("Current State = " + this);
                return true;
            }
        },
    
        WHITE {
            public boolean process(Context context) {
                context.state(ColorState.BLUE);
                System.out.println("Current State = " + this);
                return true;
            }
        },
    
        BLUE {
            public boolean process(Context context) {
                context.state(ColorState.RED);
                System.out.println("Current State = " + this);
                return false;
            }
        };
    
        public abstract boolean process(Context context);
    }
    
    Yarn状态机

    在Yarn中,每种状态转换由一个四元组表示,分别是转换前状态(preState)、转换后状态(postState)、事件(event)和回调函数(hook)。YARN定义了三种状态转换方式,具体如下:

    • 一个初始状态、一个最终状态、一种事件。该方式表示状态机在preState状态下,接收到Event事件后,执行函数状态转移函数Hook,并在执行完成后将当前状态转换为postState。
    • 一个初始状态、多个最终状态、一种事件。该方式表示状态机在preState状态下,接收到Event事件后,执行函数状态转移函数Hook,并将当前状态转移为函数Hook的返回值所表示的状态。
    • 一个初始状态、一个最终状态、多种事件。该方式表示状态机在preState状态下,接收到Event1、Event2和Event3中的任何一个事件,将执行函数状态转移函数Hook,并在执行完成后将当前状态转换为postState。

    Yarn状态机类的具体实现

    如上图:YARN对外提供了一个状态机工厂StatemachineFactory,它提供多种addTransition方法供用户添加各种状态转移,一旦状态机添加完毕后,可通过调用installTopology完成一个状态机的构建。

    相关文章

      网友评论

          本文标题:第二章 Yarn基础库

          本文链接:https://www.haomeiwen.com/subject/rskogctx.html