美文网首页
Mesos 开发(调度器 & 调度驱动、执行器 &

Mesos 开发(调度器 & 调度驱动、执行器 &

作者: imsilence | 来源:发表于2017-05-04 11:52 被阅读404次

    开发

    mesos扩展

    代码位置: /include/mesos/module

    1. 认证
      见authenticatee.hpp, authorizer.hpp
      用于扩展第三方鉴权机制

    2. 资源分配
      用于扩展资源分配机制,可以扩展资源分配算法和为内建层级分配器实现新的排序算法(用户和框架)

      代码接口详见/include/mesos/master/allocator.hpp(hirerarchical.hpp实现默认分配算法)

    class Allocator
    {
    public:
      // Attempts either to create a built-in DRF allocator or to load an
      // allocator instance from a module using the given name. If Try
      // does not report an error, the wrapped Allocator* is not null.
      static Try<Allocator*> create(const std::string& name);
    
      Allocator() {}
    
      virtual ~Allocator() {}
    
      //初始化分配器
      virtual void initialize(
          const Duration& allocationInterval,
          const lambda::function<
              void(const FrameworkID&,
                   const hashmap<SlaveID, Resources>&)>& offerCallback,
          const lambda::function<
              void(const FrameworkID&,
                   const hashmap<SlaveID, UnavailableResources>&)>&
            inverseOfferCallback,
          const hashmap<std::string, RoleInfo>& roles) = 0;
    
      //在分配机制中添加框架
      virtual void addFramework(
          const FrameworkID& frameworkId,
          const FrameworkInfo& frameworkInfo,
          const hashmap<SlaveID, Resources>& used) = 0;
    
      //从分配机制中移除框架
      virtual void removeFramework(
          const FrameworkID& frameworkId) = 0;
    
      //激活指定框架
      // Offers are sent only to activated frameworks.
      virtual void activateFramework(
          const FrameworkID& frameworkId) = 0;
    
      //暂停指定框架
      virtual void deactivateFramework(
          const FrameworkID& frameworkId) = 0;
    
      //更新框架信息
      virtual void updateFramework(
          const FrameworkID& frameworkId,
          const FrameworkInfo& frameworkInfo) = 0;
    
      // Note that the 'total' resources are passed explicitly because it
      // includes resources that are dynamically "checkpointed" on the
      // slave (e.g. persistent volumes, dynamic reservations, etc). The
      // slaveInfo resources, on the other hand, correspond directly to
      // the static --resources flag value on the slave.
    
      //在分配机制中添加slave
      virtual void addSlave(
          const SlaveID& slaveId,
          const SlaveInfo& slaveInfo,
          const Option<Unavailability>& unavailability,
          const Resources& total,
          const hashmap<FrameworkID, Resources>& used) = 0;
    
      //从分配机制中移除slave
      virtual void removeSlave(
          const SlaveID& slaveId) = 0;
    
      // Note that 'oversubscribed' resources include the total amount of
      // oversubscribed resources that are allocated and available.
      // TODO(vinod): Instead of just oversubscribed resources have this
      // method take total resources. We can then reuse this method to
      // update slave's total resources in the future.
    
      //更新slave信息
      virtual void updateSlave(
          const SlaveID& slave,
          const Resources& oversubscribed) = 0;
    
      //激活指定的slave
      // Offers are sent only for activated slaves.
      virtual void activateSlave(
          const SlaveID& slaveId) = 0;
    
      //暂停指定的slave
      virtual void deactivateSlave(
          const SlaveID& slaveId) = 0;
    
      //更新slave白名单列表
      virtual void updateWhitelist(
          const Option<hashset<std::string>>& whitelist) = 0;
    
      //当框架接收指定资源请时触发的动作
      virtual void requestResources(
          const FrameworkID& frameworkId,
          const std::vector<Request>& requests) = 0;
    
      //更新指定框架在指定slave上的资源分配
      virtual void updateAllocation(
          const FrameworkID& frameworkId,
          const SlaveID& slaveId,
          const std::vector<Offer::Operation>& operations) = 0;
    
      //
      virtual process::Future<Nothing> updateAvailable(
          const SlaveID& slaveId,
          const std::vector<Offer::Operation>& operations) = 0;
    
      // We currently support storing the next unavailability, if there is one, per
      // slave. If `unavailability` is not set then there is no known upcoming
      // unavailability. This might require the implementation of the function to
      // remove any inverse offers that are outstanding.
      virtual void updateUnavailability(
          const SlaveID& slaveId,
          const Option<Unavailability>& unavailability) = 0;
    
      // Informs the allocator that the inverse offer has been responded to or
      // revoked. If `status` is not set then the inverse offer was not responded
      // to, possibly because the offer timed out or was rescinded. This might
      // require the implementation of the function to remove any inverse offers
      // that are outstanding. The `unavailableResources` can be used by the
      // allocator to distinguish between different inverse offers sent to the same
      // framework for the same slave.
      virtual void updateInverseOffer(
          const SlaveID& slaveId,
          const FrameworkID& frameworkId,
          const Option<UnavailableResources>& unavailableResources,
          const Option<InverseOfferStatus>& status,
          const Option<Filters>& filters = None()) = 0;
    
      // Retrieves the status of all inverse offers maintained by the allocator.
      virtual process::Future<
          hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>>
        getInverseOfferStatuses() = 0;
    
      //从指定框架回收资源触发
      // Informs the Allocator to recover resources that are considered
      // used by the framework.
      virtual void recoverResources(
          const FrameworkID& frameworkId,
          const SlaveID& slaveId,
          const Resources& resources,
          const Option<Filters>& filters) = 0;
    
      //框架想要重新获得之前所排查的资源offer
      // Whenever a framework that has filtered resources wants to revive
      // offers for those resources the master invokes this callback.
      virtual void reviveOffers(
          const FrameworkID& frameworkId) = 0;
      
      // 资源offer分配超时
      // Informs the allocator to stop sending resources for the framework
      virtual void suppressOffers(
          const FrameworkID& frameworkId) = 0;
    };
    

    使用: --allocator指定资源分配模块

    代码详见: /src/master/allocator/sorter/sorter.hpp(drf/sorter.hpp中实现公平分配并支持带权值优先级)
    
    class Sorter
    {
    public:
      virtual ~Sorter() {}
    
      //从分配算法中增加某个client
      // Adds a client to allocate resources to. A client
      // may be a user or a framework.
      virtual void add(const std::string& client, double weight = 1) = 0;
    
      //从分配算法删除某个client
      // Removes a client.
      virtual void remove(const std::string& client) = 0;
    
      //从排序器增加某个client
      // Readds a client to the sort after deactivate.
      virtual void activate(const std::string& client) = 0;
    
      //从排序器移除某个client
      // Removes a client from the sort, so it won't get allocated to.
      virtual void deactivate(const std::string& client) = 0;
    
      // Specify that resources have been allocated to the given client.
      virtual void allocated(
          const std::string& client,
          const SlaveID& slaveId,
          const Resources& resources) = 0;
    
      // Updates a portion of the allocation for the client, in order to
      // augment the resources with additional metadata (e.g., volumes)
      // This means that the new allocation must not affect the static
      // roles, or the overall quantities of resources!
      virtual void update(
          const std::string& client,
          const SlaveID& slaveId,
          const Resources& oldAllocation,
          const Resources& newAllocation) = 0;
    
      // Specify that resources have been unallocated from the given client.
      virtual void unallocated(
          const std::string& client,
          const SlaveID& slaveId,
          const Resources& resources) = 0;
    
      // Returns the resources that have been allocated to this client.
      virtual hashmap<SlaveID, Resources> allocation(const std::string& client) = 0;
    
      // Returns the clients that have allocations on this slave.
      virtual hashmap<std::string, Resources> allocation(
          const SlaveID& slaveId) = 0;
    
      //返回分配给某个client的资源
      // Returns the given slave's resources that have been allocated to
      // this client.
      virtual Resources allocation(
          const std::string& client,
          const SlaveID& slaveId) = 0;
    
      //在资源池中添加slave资源
      // Add resources to the total pool of resources this
      // Sorter should consider.
      virtual void add(const SlaveID& slaveId, const Resources& resources) = 0;
    
      //在资源池中移除slave资源
      // Remove resources from the total pool.
      virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0;
    
      ////更新资源池中slave资源
      // Updates the total pool of resources.
      virtual void update(const SlaveID& slaveId, const Resources& resources) = 0;
    
      //按排序算法对client进行排序
      // Returns a list of all clients, in the order that they
      // should be allocated to, according to this Sorter's policy.
      virtual std::list<std::string> sort() = 0;
    
      // Returns true if this Sorter contains the specified client,
      // either active or deactivated.
      virtual bool contains(const std::string& client) = 0;
    
      // Returns the number of clients this Sorter contains,
      // either active or deactivated.
      virtual int count() = 0;
    };
    
    1. 隔离
      见isolator.hpp
      提供新的隔离方式和监控机制

    2. 匿名
      见anonymous.hpp
      与master和slave启动时被加载, 与父进程共同存在, 不会扩展或替代mesos已有的功能

    3. hook
      见/include/mesos/hook.hpp
      通过hook扩展组件的功能

      使用--hooks选项设置hook列表

    模块通过在master和slave时通过参数--modules指定json文件来设置模块加载及配置, json文件格式:

    {
        "libraries" : [
            {
                "file" : "",
                "name" : "",
                "modules" : [
                    {
                        "name" : "",
                        "parameters" : [
                            {
                                "key" : "",
                                "value" : ""
                            }
                        ]
                    }
                ]
            }
        ]
    }
    

    mesos开发

    消息
    mesos各组件之间使用protocol buffer定义发送的消息,所有消息格式见代码/include/mesos/mesos.proto

    框架
    + 调度器 & 调度驱动
    负责管理框架所获得的资源, 代码见/include/mesos/scheduler/scheduler.hpp

    class Scheduler
    {
    public:
      // Empty virtual destructor (necessary to instantiate subclasses).
      virtual ~Scheduler() {}
    
      //mesos进行注册时被回调
      // Invoked when the scheduler successfully registers with a Mesos
      // master. A unique ID (generated by the master) used for
      // distinguishing this framework from others and MasterInfo with the
      // ip and port of the current master are provided as arguments.
      virtual void registered(
          SchedulerDriver* driver,
          const FrameworkID& frameworkId,
          const MasterInfo& masterInfo) = 0;
    
      //当重新选举mesos master后,被回调重新进行注册
      // Invoked when the scheduler re-registers with a newly elected
      // Mesos master. This is only called when the scheduler has
      // previously been registered. MasterInfo containing the updated
      // information about the elected master is provided as an argument.
      virtual void reregistered(
          SchedulerDriver* driver,
          const MasterInfo& masterInfo) = 0;
    
      //在调度器和master被断开时被回调
      // Invoked when the scheduler becomes "disconnected" from the master
      // (e.g., the master fails and another is taking over).
      virtual void disconnected(SchedulerDriver* driver) = 0;
    
      //在master向framework提供资源offer时调用
      // Invoked when resources have been offered to this framework. A
      // single offer will only contain resources from a single slave.
      // Resources associated with an offer will not be re-offered to
      // _this_ framework until either (a) this framework has rejected
      // those resources (see SchedulerDriver::launchTasks) or (b) those
      // resources have been rescinded (see Scheduler::offerRescinded).
      // Note that resources may be concurrently offered to more than one
      // framework at a time (depending on the allocator being used). In
      // that case, the first framework to launch tasks using those
      // resources will be able to use them while the other frameworks
      // will have those resources rescinded (or if a framework has
      // already launched tasks with those resources then those tasks will
      // fail with a TASK_LOST status and a message saying as much).
      virtual void resourceOffers(
          SchedulerDriver* driver,
          const std::vector<Offer>& offers) = 0;
    
      /根据不同的分配器,可能将一个资源分配给多个框架,但是第一个响应master的framework会得到资源,其他framework会被回调表示master撤销某资源offer, 若framework在收到该请求之前已经返回taskinfo,则在task状态中将受到taskstatus为lost状态
      // Invoked when an offer is no longer valid (e.g., the slave was
      // lost or another framework used resources in the offer). If for
      // whatever reason an offer is never rescinded (e.g., dropped
      // message, failing over framework, etc.), a framework that attempts
      // to launch tasks using an invalid offer will receive TASK_LOST
      // status updates for those tasks (see Scheduler::resourceOffers).
      virtual void offerRescinded(
          SchedulerDriver* driver,
          const OfferID& offerId) = 0;
    
      //任务状态发生变化回调
      // Invoked when the status of a task has changed (e.g., a slave is
      // lost and so the task is lost, a task finishes and an executor
      // sends a status update saying so, etc). If implicit
      // acknowledgements are being used, then returning from this
      // callback _acknowledges_ receipt of this status update! If for
      // whatever reason the scheduler aborts during this callback (or
      // the process exits) another status update will be delivered (note,
      // however, that this is currently not true if the slave sending the
      // status update is lost/fails during that time). If explicit
      // acknowledgements are in use, the scheduler must acknowledge this
      // status on the driver.
      virtual void statusUpdate(
          SchedulerDriver* driver,
          const TaskStatus& status) = 0;
    
      //向调度器传递执行器发送的消息,调度器可以访问执行器和slavede Id, 以及调度器所发送的数据
      // Invoked when an executor sends a message. These messages are best
      // effort; do not expect a framework message to be retransmitted in
      // any reliable fashion.
      virtual void frameworkMessage(
          SchedulerDriver* driver,
          const ExecutorID& executorId,
          const SlaveID& slaveId,
          const std::string& data) = 0;
    
      //当slave丢失时回调
      // Invoked when a slave has been determined unreachable (e.g.,
      // machine failure, network partition). Most frameworks will need to
      // reschedule any tasks launched on this slave on a new slave.
      virtual void slaveLost(
          SchedulerDriver* driver,
          const SlaveID& slaveId) = 0;
    
      //执行器丢失是回调
      // Invoked when an executor has exited/terminated. Note that any
      // tasks running will have TASK_LOST status updates automagically
      // generated.
      virtual void executorLost(
          SchedulerDriver* driver,
          const ExecutorID& executorId,
          const SlaveID& slaveId,
          int status) = 0;
    
      //当发送错误时调用,常用于清理工作
      // Invoked when there is an unrecoverable error in the scheduler or
      // scheduler driver. The driver will be aborted BEFORE invoking this
      // callback.
      virtual void error(
          SchedulerDriver* driver,
          const std::string& message) = 0;
    };
    
    
    // Abstract interface for connecting a scheduler to Mesos. This
    // interface is used both to manage the scheduler's lifecycle (start
    // it, stop it, or wait for it to finish) and to interact with Mesos
    // (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
    // below for a concrete example of a SchedulerDriver.
    class SchedulerDriver
    {
    public:
      // Empty virtual destructor (necessary to instantiate subclasses).
      // It is expected that 'stop()' is called before this is called.
      virtual ~SchedulerDriver() {}
    
      //启动调度器
      // Starts the scheduler driver. This needs to be called before any
      // other driver calls are made.
      virtual Status start() = 0;
    
      //停止驱动
      // Stops the scheduler driver. If the 'failover' flag is set to
      // false then it is expected that this framework will never
      // reconnect to Mesos. So Mesos will unregister the framework and
      // shutdown all its tasks and executors. If 'failover' is true, all
      // executors and tasks will remain running (for some framework
      // specific failover timeout) allowing the scheduler to reconnect
      // (possibly in the same process, or from a different process, for
      // example, on a different machine).
      virtual Status stop(bool failover = false) = 0;
    
      // Aborts the driver so that no more callbacks can be made to the
      // scheduler. The semantics of abort and stop have deliberately been
      // separated so that code can detect an aborted driver (i.e., via
      // the return status of SchedulerDriver::join, see below), and
      // instantiate and start another driver if desired (from within the
      // same process). Note that 'stop()' is not automatically called
      // inside 'abort()'.
      virtual Status abort() = 0;
    
      //等待驱动退出发送abort和stop动作
      // Waits for the driver to be stopped or aborted, possibly
      // _blocking_ the current thread indefinitely. The return status of
      // this function can be used to determine if the driver was aborted
      // (see mesos.proto for a description of Status).
      virtual Status join() = 0;
    
      //依次执行start和join
      // Starts and immediately joins (i.e., blocks on) the driver.
      virtual Status run() = 0;
    
      //向mesos请求资源并将资源提供给调度器
      // Requests resources from Mesos (see mesos.proto for a description
      // of Request and how, for example, to request resources from
      // specific slaves). Any resources available are offered to the
      // framework via Scheduler::resourceOffers callback, asynchronously.
      virtual Status requestResources(const std::vector<Request>& requests) = 0;
    
      //在offer上启动一组任务
      // Launches the given set of tasks. Any resources remaining (i.e.,
      // not used by the tasks or their executors) will be considered
      // declined. The specified filters are applied on all unused
      // resources (see mesos.proto for a description of Filters).
      // Available resources are aggregated when multiple offers are
      // provided. Note that all offers must belong to the same slave.
      // Invoking this function with an empty collection of tasks declines
      // offers in their entirety (see Scheduler::declineOffer).
      virtual Status launchTasks(
          const std::vector<OfferID>& offerIds,
          const std::vector<TaskInfo>& tasks,
          const Filters& filters = Filters()) = 0;
    
      
      // DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
      virtual Status launchTasks(
          const OfferID& offerId,
          const std::vector<TaskInfo>& tasks,
          const Filters& filters = Filters()) = 0;
    
      //kill任务
      // Kills the specified task. Note that attempting to kill a task is
      // currently not reliable. If, for example, a scheduler fails over
      // while it was attempting to kill a task it will need to retry in
      // the future. Likewise, if unregistered / disconnected, the request
      // will be dropped (these semantics may be changed in the future).
      virtual Status killTask(const TaskID& taskId) = 0;
    
      //接受资源offer
      // Accepts the given offers and performs a sequence of operations on
      // those accepted offers. See Offer.Operation in mesos.proto for the
      // set of available operations. Available resources are aggregated
      // when multiple offers are provided. Note that all offers must
      // belong to the same slave. Any unused resources will be considered
      // declined. The specified filters are applied on all unused
      // resources (see mesos.proto for a description of Filters).
      virtual Status acceptOffers(
          const std::vector<OfferID>& offerIds,
          const std::vector<Offer::Operation>& operations,
          const Filters& filters = Filters()) = 0;
    
      //拒绝资源offer
      // Declines an offer in its entirety and applies the specified
      // filters on the resources (see mesos.proto for a description of
      // Filters). Note that this can be done at any time, it is not
      // necessary to do this within the Scheduler::resourceOffers
      // callback.
      virtual Status declineOffer(
          const OfferID& offerId,
          const Filters& filters = Filters()) = 0;
    
      //删除所有过滤器
      // Removes all filters previously set by the framework (via
      // launchTasks()). This enables the framework to receive offers from
      // those filtered slaves.
      virtual Status reviveOffers() = 0;
    
      // Inform Mesos master to stop sending offers to the framework. The
      // scheduler should call reviveOffers() to resume getting offers.
      virtual Status suppressOffers() = 0;
    
      // Acknowledges the status update. This should only be called
      // once the status update is processed durably by the scheduler.
      // Not that explicit acknowledgements must be requested via the
      // constructor argument, otherwise a call to this method will
      // cause the driver to crash.
      virtual Status acknowledgeStatusUpdate(
          const TaskStatus& status) = 0;
    
      //从框架向执行器发送消息
      // Sends a message from the framework to one of its executors. These
      // messages are best effort; do not expect a framework message to be
      // retransmitted in any reliable fashion.
      virtual Status sendFrameworkMessage(
          const ExecutorID& executorId,
          const SlaveID& slaveId,
          const std::string& data) = 0;
    
      //获取任务状态
      // Allows the framework to query the status for non-terminal tasks.
      // This causes the master to send back the latest task status for
      // each task in 'statuses', if possible. Tasks that are no longer
      // known will result in a TASK_LOST update. If statuses is empty,
      // then the master will send the latest status for each task
      // currently known.
      virtual Status reconcileTasks(
          const std::vector<TaskStatus>& statuses) = 0;
    };
    
    
    // Concrete implementation of a SchedulerDriver that connects a
    // Scheduler with a Mesos master. The MesosSchedulerDriver is
    // thread-safe.
    //
    // Note that scheduler failover is supported in Mesos. After a
    // scheduler is registered with Mesos it may failover (to a new
    // process on the same machine or across multiple machines) by
    // creating a new driver with the ID given to it in
    // Scheduler::registered.
    //
    // The driver is responsible for invoking the Scheduler callbacks as
    // it communicates with the Mesos master.
    //
    // Note that blocking on the MesosSchedulerDriver (e.g., via
    // MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
    // in anyway because they are handled by a different thread.
    //
    // Note that the driver uses GLOG to do its own logging. GLOG flags
    // can be set via environment variables, prefixing the flag name with
    // "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
    // src/logging/flags.hpp. Mesos flags can also be set via environment
    // variables, prefixing the flag name with "MESOS_", e.g.,
    // "MESOS_QUIET=1".
    //
    // See src/examples/test_framework.cpp for an example of using the
    // MesosSchedulerDriver.
    class MesosSchedulerDriver : public SchedulerDriver
    {
    public:
      // Creates a new driver for the specified scheduler. The master
      // should be one of:
      //
      //     host:port
      //     zk://host1:port1,host2:port2,.../path
      //     zk://username:password@host1:port1,host2:port2,.../path
      //     file:///path/to/file (where file contains one of the above)
      //
      // The driver will attempt to "failover" if the specified
      // FrameworkInfo includes a valid FrameworkID.
      //
      // Any Mesos configuration options are read from environment
      // variables, as well as any configuration files found through the
      // environment variables.
      //
      // TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can take
      // 'Option<Credential>' as parameter. Currently it cannot because
      // 'stout' is not visible from here.
      MesosSchedulerDriver(
          Scheduler* scheduler,
          const FrameworkInfo& framework,
          const std::string& master);
    
      // Same as the above constructor but takes 'credential' as argument.
      // The credential will be used for authenticating with the master.
      MesosSchedulerDriver(
          Scheduler* scheduler,
          const FrameworkInfo& framework,
          const std::string& master,
          const Credential& credential);
    
      // These constructors are the same as the above two, but allow
      // the framework to specify whether implicit or explicit
      // acknowledgements are desired. See statusUpdate() for the
      // details about explicit acknowledgements.
      //
      // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
      // these new constructors are exposed.
      MesosSchedulerDriver(
          Scheduler* scheduler,
          const FrameworkInfo& framework,
          const std::string& master,
          bool implicitAcknowledgements);
    
      MesosSchedulerDriver(
          Scheduler* scheduler,
          const FrameworkInfo& framework,
          const std::string& master,
          bool implicitAcknowlegements,
          const Credential& credential);
    
      // This destructor will block indefinitely if
      // MesosSchedulerDriver::start was invoked successfully (possibly
      // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
      // not been invoked.
      virtual ~MesosSchedulerDriver();
    
      // See SchedulerDriver for descriptions of these.
      virtual Status start();
      virtual Status stop(bool failover = false);
      virtual Status abort();
      virtual Status join();
      virtual Status run();
    
      virtual Status requestResources(
          const std::vector<Request>& requests);
    
      // TODO(nnielsen): launchTasks using single offer is deprecated.
      // Use launchTasks with offer list instead.
      virtual Status launchTasks(
          const OfferID& offerId,
          const std::vector<TaskInfo>& tasks,
          const Filters& filters = Filters());
    
      virtual Status launchTasks(
          const std::vector<OfferID>& offerIds,
          const std::vector<TaskInfo>& tasks,
          const Filters& filters = Filters());
    
      virtual Status killTask(const TaskID& taskId);
    
      virtual Status acceptOffers(
          const std::vector<OfferID>& offerIds,
          const std::vector<Offer::Operation>& operations,
          const Filters& filters = Filters());
    
      virtual Status declineOffer(
          const OfferID& offerId,
          const Filters& filters = Filters());
    
      virtual Status reviveOffers();
    
      virtual Status suppressOffers();
    
      virtual Status acknowledgeStatusUpdate(
          const TaskStatus& status);
    
      virtual Status sendFrameworkMessage(
          const ExecutorID& executorId,
          const SlaveID& slaveId,
          const std::string& data);
    
      virtual Status reconcileTasks(
          const std::vector<TaskStatus>& statuses);
    
    protected:
      // Used to detect (i.e., choose) the master.
      internal::MasterDetector* detector;
    
    private:
      void initialize();
    
      Scheduler* scheduler;
      FrameworkInfo framework;
      std::string master;
    
      // Used for communicating with the master.
      internal::SchedulerProcess* process;
    
      // URL for the master (e.g., zk://, file://, etc).
      std::string url;
    
      // Mutex for enforcing serial execution of all non-callbacks.
      std::recursive_mutex mutex;
    
      // Latch for waiting until driver terminates.
      process::Latch* latch;
    
      // Current status of the driver.
      Status status;
    
      const bool implicitAcknowlegements;
    
      const Credential* credential;
    
      // Scheduler process ID.
      std::string schedulerId;
    };
    
    + 执行器 & 执行驱动
        负责启动任务并执行调取分配的任务, 代码见/include/mesos/scheduler/executor.hpp
    
    class Executor
    {
    public:
      // Empty virtual destructor (necessary to instantiate subclasses).
      virtual ~Executor() {}
    
      //在执行驱动器执行成功后和slave连接后调用
      // Invoked once the executor driver has been able to successfully
      // connect with Mesos. In particular, a scheduler can pass some
      // data to its executors through the FrameworkInfo.ExecutorInfo's
      // data field.
      virtual void registered(
          ExecutorDriver* driver,
          const ExecutorInfo& executorInfo,
          const FrameworkInfo& frameworkInfo,
          const SlaveInfo& slaveInfo) = 0;
    
      //向重启的slave重新注册
      // Invoked when the executor re-registers with a restarted slave.
      virtual void reregistered(
          ExecutorDriver* driver,
          const SlaveInfo& slaveInfo) = 0;
    
      //在执行器与slave断开连接时调用
      // Invoked when the executor becomes "disconnected" from the slave
      // (e.g., the slave is being restarted due to an upgrade).
      virtual void disconnected(ExecutorDriver* driver) = 0;
    
      //在任务在当前执行器上启动时被调用
      // Invoked when a task has been launched on this executor (initiated
      // via Scheduler::launchTasks). Note that this task can be realized
      // with a thread, a process, or some simple computation, however, no
      // other callbacks will be invoked on this executor until this
      // callback has returned.
      virtual void launchTask(
          ExecutorDriver* driver,
          const TaskInfo& task) = 0;
    
      //当任务被kill时调用
      // Invoked when a task running within this executor has been killed
      // (via SchedulerDriver::killTask). Note that no status update will
      // be sent on behalf of the executor, the executor is responsible
      // for creating a new TaskStatus (i.e., with TASK_KILLED) and
      // invoking ExecutorDriver::sendStatusUpdate.
      virtual void killTask(
          ExecutorDriver* driver,
          const TaskID& taskId) = 0;
    
      //当接到框架执行器发送的消息到达时被调用
      // Invoked when a framework message has arrived for this executor.
      // These messages are best effort; do not expect a framework message
      // to be retransmitted in any reliable fashion.
      virtual void frameworkMessage(
          ExecutorDriver* driver,
          const std::string& data) = 0;
    
      //通知执行器结束所有运行中的任务
      // Invoked when the executor should terminate all of its currently
      // running tasks. Note that after a Mesos has determined that an
      // executor has terminated any tasks that the executor did not send
      // terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED,
      // TASK_FAILED, etc) a TASK_LOST status update will be created.
      virtual void shutdown(ExecutorDriver* driver) = 0;
    
      //当制执行或执行器驱动发送错误时被调用
      // Invoked when a fatal error has occured with the executor and/or
      // executor driver. The driver will be aborted BEFORE invoking this
      // callback.
      virtual void error(
          ExecutorDriver* driver,
          const std::string& message) = 0;
    };
    
    
    // Abstract interface for connecting an executor to Mesos. This
    // interface is used both to manage the executor's lifecycle (start
    // it, stop it, or wait for it to finish) and to interact with Mesos
    // (e.g., send status updates, send framework messages, etc.). See
    // MesosExecutorDriver below for a concrete example of an
    // ExecutorDriver.
    class ExecutorDriver
    {
    public:
      // Empty virtual destructor (necessary to instantiate subclasses).
      virtual ~ExecutorDriver() {}
    
      //对驱动进行初始化
      // Starts the executor driver. This needs to be called before any
      // other driver calls are made.
      virtual Status start() = 0;
    
      //对驱动进行清理
      // Stops the executor driver.
      virtual Status stop() = 0;
    
      //在驱动异常退出时调用
      // Aborts the driver so that no more callbacks can be made to the
      // executor. The semantics of abort and stop have deliberately been
      // separated so that code can detect an aborted driver (i.e., via
      // the return status of ExecutorDriver::join, see below), and
      // instantiate and start another driver if desired (from within the
      // same process ... although this functionality is currently not
      // supported for executors).
      virtual Status abort() = 0;
    
      //等待驱动停止或异常停止
      // Waits for the driver to be stopped or aborted, possibly
      // _blocking_ the current thread indefinitely. The return status of
      // this function can be used to determine if the driver was aborted
      // (see mesos.proto for a description of Status).
      virtual Status join() = 0;
    
      //启动驱动并阻塞后调用join操作
      // Starts and immediately joins (i.e., blocks on) the driver.
      virtual Status run() = 0;
    
      //想调度器发送任务状态更新
      // Sends a status update to the framework scheduler, retrying as
      // necessary until an acknowledgement has been received or the
      // executor is terminated (in which case, a TASK_LOST status update
      // will be sent). See Scheduler::statusUpdate for more information
      // about status update acknowledgements.
      virtual Status sendStatusUpdate(const TaskStatus& status) = 0;
    
      //发送消息给framework
      // Sends a message to the framework scheduler. These messages are
      // best effort; do not expect a framework message to be
      // retransmitted in any reliable fashion.
      virtual Status sendFrameworkMessage(const std::string& data) = 0;
    };
    
    
    // Concrete implementation of an ExecutorDriver that connects an
    // Executor with a Mesos slave. The MesosExecutorDriver is
    // thread-safe.
    //
    // The driver is responsible for invoking the Executor callbacks as it
    // communicates with the Mesos slave.
    //
    // Note that blocking on the MesosExecutorDriver (e.g., via
    // MesosExecutorDriver::join) doesn't affect the executor callbacks in
    // anyway because they are handled by a different thread.
    //
    // Note that the driver uses GLOG to do its own logging. GLOG flags
    // can be set via environment variables, prefixing the flag name with
    // "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
    // src/logging/flags.hpp. Mesos flags can also be set via environment
    // variables, prefixing the flag name with "MESOS_", e.g.,
    // "MESOS_QUIET=1".
    //
    // See src/examples/test_executor.cpp for an example of using the
    // MesosExecutorDriver.
    class MesosExecutorDriver : public ExecutorDriver
    {
    public:
      // Creates a new driver that uses the specified Executor. Note, the
      // executor pointer must outlive the driver.
      explicit MesosExecutorDriver(Executor* executor);
    
      // This destructor will block indefinitely if
      // MesosExecutorDriver::start was invoked successfully (possibly via
      // MesosExecutorDriver::run) and MesosExecutorDriver::stop has not
      // been invoked.
      virtual ~MesosExecutorDriver();
    
      // See ExecutorDriver for descriptions of these.
      virtual Status start();
      virtual Status stop();
      virtual Status abort();
      virtual Status join();
      virtual Status run();
      virtual Status sendStatusUpdate(const TaskStatus& status);
      virtual Status sendFrameworkMessage(const std::string& data);
    
    private:
      friend class internal::ExecutorProcess;
    
      Executor* executor;
    
      // Libprocess process for communicating with slave.
      internal::ExecutorProcess* process;
    
      // Mutex for enforcing serial execution of all non-callbacks.
      std::recursive_mutex mutex;
    
      // Latch for waiting until driver terminates.
      process::Latch* latch;
    
      // Current status of the driver.
      Status status;
    };
    
    + 启动器
       用于启动调度器驱动
    

    framework调度器示例:

    #!/usr/bin/env python
    #encoding: utf-8
    
    import Queue
    import logging
    import threading
    import time
    
    
    from pesos.scheduler import PesosSchedulerDriver
    from pesos.vendor.mesos import mesos_pb2
    
    from mesos.interface import Scheduler
    
    
    _logger = logging.getLogger(__name__)
    
    class TestScheduler(Scheduler):
    
        TASK_CPU = 0.1
        TASK_MEM = 2
    
        def __init__(self, queue):
            self.tasks = queue
            self.terminal = 0
            self.total_tasks = queue.qsize()
    
        def registered(self, driver, frameworkId, masterInfo):
            _logger.info('Registered framework %s', frameworkId)
    
        def reregistered(self, driver, masterInfo):
            _logger.info('Connected with master %s', masterInfo.ip)
    
        def disconnected(self, driver):
            _logger.info('Disconnected from master')
    
        def resourceOffers(self, driver, offers):
            _logger.info('Recived %s offers', len(offers))
    
            def handle_offers():
                declined = []
                
                for offer in offers:
                    offer_cpu = 0
                    offer_mem = 0
    
                    if self.tasks.empty():
                        declined.append(offer.id.value)
                        continue
    
                    for resource in offer.resources:
                        if resource.name == 'cpus':
                            offer_cpu = resource.scalar.value
                        if resource.name == 'mem':
                            offer_mem = resource.scalar.value
    
                    _logger.info('offer:%s, cpu:%s, mem:%s', offer.id.value, offer_cpu, offer_mem)
                    tasks = []
    
                    while offer_mem >= self.TASK_MEM and offer_cpu >= self.TASK_CPU\
                        and not self.tasks.empty():
                        offer_cpu -= self.TASK_CPU
                        offer_mem -= self.TASK_MEM
    
                        executor_id, task_id, args = self.tasks.get()
                        self.tasks.task_done()
                        _logger.info('Queue task %s:%s', executor_id, task_id)
                        tasks.append(self._build_task(offer, executor_id, task_id, args))
                    if tasks:
                        driver.launch_tasks([offer.id.value], tasks)
                for offerid in declined:
                    driver.decline_offer(offerid)
    
            th = threading.Thread(target=handle_offers)
            th.start()
    
        def _build_task(self, offer, executor_id, task_id, args):
            task = mesos_pb2.TaskInfo()
            task.name = "Test Task of Silence"
    
            cpus = task.resources.add()
            cpus.name = "cpus"
            cpus.type = mesos_pb2.Value.SCALAR
            cpus.scalar.value = self.TASK_CPU
    
            mem = task.resources.add()
            mem.name = "mem"
            mem.type = mesos_pb2.Value.SCALAR
            mem.scalar.value = self.TASK_MEM
    
            task.executor.command.value = "ping %s -c 20" % args[0]
    
            '''
            task.executor.command.user.value = 'root'
    
            # TODO LIST
            environment = mesos_pb2.Environment()
            variable = environment.variables.add()
            variable.name = key
            variable.value = value
    
            uri = task.executor.uris.add()
            uri.value = p_uri
            uri.executable = False
            uri.extract = True
            '''
    
            '''
            cpus = task.executor.resources.add()
            cpus.name = "cpus"
            cpus.type = mesos_pb2.Value.SCALAR
            cpus.scalar.value = self.TASK_CPU
    
            mem = task.executor.resources.add()
            mem.name = "mem"
            mem.type = mesos_pb2.Value.SCALAR
            mem.scalar.value = self.TASK_MEM
            task.executor.source = None
            task.executor.data = None
    
            '''
    
            task.task_id.value = "%d:%d" % (executor_id, task_id)
            task.slave_id.MergeFrom(offer.slave_id)
    
            task.executor.executor_id.value = str(executor_id)
            task.executor.framework_id.value = offer.framework_id.value
            
            return task
    
        def offerRescinded(self, driver, offerId):
            _logger.info('Offer rescinded %s', offerId.value)
    
        def statusUpdate(self, driver, taskStatus):
            statuses = {
                mesos_pb2.TASK_STAGING: "STAGING",
                mesos_pb2.TASK_STARTING: "STARTING",
                mesos_pb2.TASK_RUNNING: "RUNNING",
                mesos_pb2.TASK_FINISHED: "FINISHED",
                mesos_pb2.TASK_FAILED: "FAILED",
                mesos_pb2.TASK_KILLED: "KILLED",
                mesos_pb2.TASK_LOST: "LOST",
            }
    
            _logger.info("Received status update for task %s (%s)", taskStatus.task_id.value, statuses[taskStatus.state])
    
            if taskStatus.state == mesos_pb2.TASK_FINISHED or taskStatus.state == mesos_pb2.TASK_FAILED or \
                taskStatus.state == mesos_pb2.TASK_KILLED or taskStatus.state == mesos_pb2.TASK_LOST:
                self.terminal += 1
    
            if self.terminal == self.total_tasks:
                driver.stop()
    
        def frameworkMessage(self, driver, executorId, slaveId, data):
            _logger.info('Message from executor %s and slave %s : %s', executorId.value, slaveId.value, data)
    
        def slaveLost(self, driver, slaveId):
            _logger.info('Slave %s has been lost', slaveId.value)
    
        def executorLost(self, dirver, executorId, slaveId, exitCode):
            _logger.info('Executor %s has been lost on slave %s with exit code %s', executorId.value, slaveId.value, exitCode)
    
        def error(self, driver, message):
            _logger.info('There was an error:%s', message)
    
    
    
    if __name__ == '__main__':
        logging.basicConfig(level=logging.DEBUG)
    
        num_tasks = 50
        num_executors = 5
    
        tasks = Queue.Queue()
        for task in xrange(num_tasks):
            for executor in xrange(num_executors):
                tasks.put((executor, task, ["www.360.cn"]))
    
        master_uri = 'master@192.168.56.101:5050'
    
        framework = mesos_pb2.FrameworkInfo()
        framework.name = "Test Python Framework of Silence"
        framework.user = "root"
    
        driver = PesosSchedulerDriver(
            TestScheduler(tasks),
            framework,
            master_uri
        )
    
        _logger.info('Starting driver')
        driver.start()
    
        _logger.info('Joining driver')
        driver.join()
    

    相关文章

      网友评论

          本文标题:Mesos 开发(调度器 & 调度驱动、执行器 &

          本文链接:https://www.haomeiwen.com/subject/tbvrtxtx.html