作者:徐佳
文为原创文章,转载请注明作者及出处
在介绍Juice之前,我想先聊一聊Mesos,Mesos被称为2层调度框架,是因为Master通过内部的Allocator完成Master->Framework的第一层调度,再由Framework通过调度器完成对于资源->任务的分配,这个过程称为第二层调度。
About MesosFramework
先来看一看Mesos&Framework的整体架构图:
arch.png
Mesos的Framework分为2部分组成,分别为调度器和执行器。
调度器被称为Scheduler,从Mesos1.0版本开始,官方提供了基于HTTP的RestAPI供外部调用并进行二次开发。
Scheduler用于处理Master端发起的回调事件(资源列表并加载任务、任务状态通知等),进行相应处理。Agent接收到Master分配的任务时,会根据任务的container-type进行不同的处理,当处理默认container-type='Mesos'时,先检查Framework所对应的Executor进程是否启动,如果没有启动则会先启动Executor进程,然后再提交任务到该Executor去执行,当运行一个container-type='Docker'的任务时,则启动Docker Executor进行处理,程序的运行状态完全取决于Docker内部的处理及返回值。
MesosFramework交互API
交互分为2部分API,分别为SchedulerAPI(http://mesos.apache.org/documentation/latest/scheduler-http-api/) 与ExecutorAPI(http://mesos.apache.org/documentation/latest/Executor-http-api/), 每个API都会以TYPE来区分,具体的处理流程如下:
1.Scheduler提交一个请求(type='SUBSCRIBE')到Master(http://master-ip:5050/api/v1/scheduler), 并需要设置'subscribe.framework_info.id',该ID由Scheduler生成,在一个Mesos集群中必须保证唯一,Mesos以此FrameworkID来区分各个Framework所提交的任务,发送完毕后,Scheduler端等待Master的'SUBSCRIBE'回调事件,Master的返回事件被定义在event对象中,event.type为'SUBSCRIBE'(注意:'SUBSCRIBE'请求发起后,Scheduler与Master端会保持会话连接(keep-alive),Master端主动发起的事件回调都会通过该连接通知到Scheduler)。(scheduler-http-api中接口'SUBSCRIBE')
2.Master主动发起'OFFERS'事件回调,通知Scheduler目前集群可分配使用资源,事件的event.type为'OFFERS'。(scheduler-http-api中接口'OFFERS')
3.Scheduler调用resourcesOffer为Offers安排Tasks。当完成任务分配后,主动发起'ACCEPT'事件请求到Master端告知Offers-Tasks列表。(scheduler-http-api中接口'ACCEPT')
4.Master接收到Scheduler的任务请求后,将任务发送到OfferId对应的Agent中去执行任务。
5.Agent接收到任务,检查任务对应的Executor是否启动,如启动,则调用该Executor执行任务,如未启动,则调用lauchExecutor()创建Executor对象并执行initialize()初始化Executor,Executor初始化过程中会调用RegisterExecutorMessage在Agent上注册,之后便接受任务开始执行。(Executor-http-api中接口'LAUNCH')
6.Executor执行完毕或错误时通知Agent任务的task_status。(Executor-http-api中接口'UPDATE')
7.Agent再同步task_status给Master,Master则调用'UPDATE'事件回调,通知Scheduler更新任务状态。(scheduler-http-api中接口'UPDATE')
8.Scheduler确认后发送'ACKNOWLEDGE'请求告知Master任务状态已确认。(scheduler-http-api中接口'ACKNOWLEDGE')
任务状态标示及Agent宕机处理###
对于一个任务的运行状态,Mesos定义了13种TASK_STATUS来标示,常用的有以下几种:
TASK_STAGING-任务准备状态,该任务已有Master分配给Slave,但Slave还未运行时的状态。
TASK_RUNNING-任务已在Agent上运行。
TASK_FINISHED-任务已运行完毕。
TASK_KILLED-任务被主动终止,调用scheduler-http-api中'KILL'接口。
TASK_FAILED-任务执行失败。
TASK_LOST-任务丢失,通常发生在Slave宕机。
当Agent宕机导致TASK_LOST时,Mesos又是怎么来处理的呢?
在Master和Agent之间,一般都是由Master主动向每一个Agent发送Ping消息,如果在设定时间内(flag.slave_ping_timeout,默认15s)没有收到Agent的回复,并且达到一定次数(flag.max_slave_ping_timeouts,默认次数为5),那么Master会操作以下几个步骤:
1.将该Agent从Master中删除,此时该Agent的资源将不会再分配给Scheduler。
2.遍历该Agent上运行的所有任务,向对应的Framework发送任务的Task_Lost状态更新,同时把这些任务从Master中删除。
3.遍历该Agent上的所有Executor,并删除。
4.触发Recind Offer,把这个Agent上已经分配给Scheduler的Offer撤销。
5.把这个Agent从master的Replicated log中删除(Mesos Master依赖Replicated log中的部分持久化集群配置信息进行failer over/recovery)。
使用Marathon可以方便的发布及部署应用###
目前有很多基于MesosFramework的开源框架,例如Marathon。我们在生产环境中已经使用了Marathon框架,一般用它来运行long-run service/application,依靠marathon来管理应用服务,它支持应用服务自动/手动起停、水平扩展、健康检查等。我们依靠jenkins+docker+marathon完成服务的自动化发布及部署。
Why Juice
下面来讲下我基于MesosFramework所开发的一套框架-Juice。(开源地址:https://github.com/HujiangTechnology/Juice.git)
在开发Juice之前,我公司所有的音视频转码切片任务都是基于一个叫TaskCenter的队列分配框架,该框架并不具备分布式调度的功能(资源分配),所以集群的资源利用率一直是个问题,所以,我们想开发一套基于以下三点的新框架来替代老的TaskCenter。
1.一个任务调度型的框架,需要对资源(硬件)尽可能的做到最大的利用率。
2.框架必须可运行各种类型的任务。
3.平台必须是稳定的。
凭借对Marathon的使用经验,以及对于Mesos相关文档的查阅,我们决定基于MesosFramework来开发一套任务调度型的框架,Mesos与Framework的特性刚才已经说过了,而我们将所需要执行的任务封在Docker中去执行,那么对于框架本身来说他就不用关心任务的类型了,这样业务的边界和框架的边界就变得很清晰,对于Framework来说,运行一个Docker任务也很方便,刚才说过Mesos内置了DockerExecutor可以完美的启动Docker任务,这样,我们的框架在Agent端所需要的开发就非常的少。
Juice框架在这样的背景下开始了开发的历程,我们对于它的定位是一套分布式任务云系统,这里为什么要称为任务云系统呢?因为对于调用者来说,使用Juice,只要做2件事情:把要做的任务打成Docker镜像并Push到docker仓库中,然后向Juice提交一个Docker类型的任务。其它的,交给Juice去完成就可以了,调用者不用关心任务会在哪台物理机上被执行,只需要关心任务本身的执行状况。
Juice架构
除此,Juice有以下一些特点,Juice框架分为Juice-Rest(Juice交互API层,可以完成外界对于Juice Task的CRUD操作)和Juice-Service(Juice核心层,负责与MesosMaster之间的交互,资源分配、任务提交、任务状态更新等),在一套基于Juice框架的应用系统中,通常部署1-N个Juice-Rest(取决于系统的TPS),以及N个Juice-Service(Juice-Service分主从模式,为1主多从,by zookeeper),对于同一个Mesos集群来说,可以部署1-N套Juice框架,以FrameworkID来区分,需要部署多套的话在Juice-Service的配置文件中设置mesos.framework.tag为不同的值即可。
Juice.pngJuice-Rest参数设置
Juice-Rest采用Spring-Boot编写(Juice-API接口参见:https://github.com/HujiangTechnology/Juice/blob/master/doc/api_document.md), 处理外界发起的对任务CURD操作,当提交一个任务到Juice-Rest时,需要设置一些参数,比如:
example to run docker:
{
"callbackUrl":"http://www.XXXXXXXX.com/v5/tasks/callback",
"taskName":"demo-task",
"env":{"name":"environment","value":"dev"},
"args":["this is a test"],
"container":{
"docker":{
"image":"dockerhub.XXXX.com/demo-slice"
},
"type":"DOCKER"
}
}
其中Container中的type目前仅支持'Docker',我们没有加入'Mesos'类型的Container模式是因为目前项目组内部的服务已经都基于Docker化,但是预留了'Mesos'类型,在未来可以支持'Mesos'类型的任务。
commands模式支持运行Linux命令行命令和Shell脚本,比如:
"commands":"/home/app/entrypoint.sh"
这里支持Commands模式的原因有2点
1.有时调用方可能只是想在某台制定的Agent上运行一个脚本。
2.公司内部其他有些项目组还在使用Jar包启动的模式,预留一个Shell脚本的入口可以对这些项目产生支持。
env设置示例,设置运行的任务环境为dev:
"env":{"name":"environment","value":"dev"}
args设置示例,设置文件路径:
"args":["/tid/res/test.mp4"]
PS:使用Commands模式时不支持args选项。
此外,Juice-Rest支持用户自定义资源大小(目前版本仅支持自定义CPU、内存),如需要指定资源,需在请求接口中配置resources对象,否则,将会使用默认的资源大小运行任务。Juice-Rest支持资源约束(constrains),即满足在特定Host或Rack_id标签的Agent上运行某任务,设置接口中constrains对象字段即可。
Juice所使用的中间件(MQ、DB等)
下面讲一下Rest层的处理模型,当外界发起一个任务请求时,Juice-Rest接收到任务后,并不是直接提交到Juice-Service层,而是做了以下2件事情:
1.将任务放入MQ中。(目前Juice使用Redis-List来作为默认的Queue,采用LPUSH、RPOP的模式,先进先出,为什么选择使用Redis中的List作为Queue而没有选择其他诸如rabbitmq、kafka这些呢,首先,Redis相对来说是一个比较轻量级的中间件,而且HA方案比较成熟,同时,在我看来,队列中的最佳任务wait数量是应该<10000的,否则,任务的执行周期将会被拉得很长,以我公司的Juice系统来举例,由于处理的都是耗时的音视频转码切片任务,通常情况下10000个任务的排队等候时间会在几个小时以上,所以当任务数量很大时,考虑扩大集群的处理能力而不是把过多的任务积压在队列中,基于此,选择Redis-List相对其他的传统MQ来说没有什么劣势。考虑到一些特殊情况,Juice也允许用户实现CacheUtils接口使用其他MQ替换Redis-List)。
2.纪录Tasks信息到Juice-Tasks表中,相当于数据落地。后续版本会基于此实现任务重试机制(目前的1.1.0内部开发版本已实现),或者在failover切换后完成任务恢复,此功能在后续1.2.0版本中考虑加入。(目前数据库使用MySql)。
当Juice-Rest接受并完成任务提交后会返回给调用方一个Long型18位数字(JuiceID,全局唯一)作为凭证号。当任务完成后,Juice-Rest会主动发起回调请求,通知调用方该任务的运行结果(以此JuiceID作为业务凭证),前提是调用方必须设置callbackUrl。同时,调用方可以使用该JuiceID对进行任务查询、终止等操作。
另外,在Juice-Rest层单独维护一个线程池来处理由Juice-service端返回的任务状态信息Task_status。
Juice-Service内部处理流程
Juice-Service可以看作是一个MesosFramework,与Master之间通讯协议采用ProtoBuf,每一种事件请求都通过对应类型的Call产生,这里Juice-Service启动时会发出Subscribe请求,由SubscribeCall()方法产生requestBody,采用OKHTTP发送,并维持与Master之间的长连接
private void connecting() throws Exception {
InputStream stream = null;
Response res = null;
try {
Protos.Call call = subscribeCall();
res = Restty.create(getUrl())
.addAccept(protocol.mediaType())
.addMediaType(protocol.mediaType())
.addKeepAlive()
.requestBody(protocol.getSendBytes(call))
.post();
streamId = res.header(STREAM_ID);
stream = res.body().byteStream();
log.info("send subscribe, frameworkId : " + frameworkId + " , url " + getUrl() + ", streamId : " + streamId);
log.debug("subscribe call : " + call);
if (null == stream) {
log.warn("stream is null");
throw new DriverException("stream is null");
}
while (true) {
int size = SendUtils.readChunkSize(stream);
byte[] event = SendUtils.readChunk(stream, size);
onEvent(event);
}
} catch (Exception e) {
log.error("service handle error, due to : " + e);
throw e;
} finally {
if (null != stream) {
stream.close();
}
if (null != res) {
res.close();
}
streamId = null;
}
}
之后便进入while循环,当Master端的通知事件发生时,调用onEvent()方法执行。
Mesos的回调事件中,需要特别处理的主要事件由以下几种:
1.SUBSCRIBED:Juice框架在接收到此事件后将注册到Master中的FrameworkID纪录到数据库juice_framework表中。
2.OFFERS:当Juice-Service接收到该类型事件时,便会进入资源/任务分配环节,分配任务资源并提交到MesosMaster。
3.UPDATE:当Agent处理完任务时,任务会由Executor->Agent->Master->Juice-Service来完成任务的状态通知。Juice-Service会将结果塞入result-list中。
4.ERROR:框架产生问题,通常这样的问题分两种,一种是比较严重的,例如Juice-Service使用了一个已经被Master端移除的FrameworkID,则Master会返回"framework has been removed"的错误信息,Juice-Service此时会抛出UnrecoverException错误:
throw new UnrecoverException(message, true)
Juice-Service在处理UnrecoverException类的错误时会Reset服务,当第二个参数为True时,会重新生成一个新的FrameworkID。
而当其他类型的错误,比如Master和Juice-Service之间的长链接中断,仅仅Reset服务。
下面我想详细来说说第二步,我们先来看下'OFFERS'请求处理代码段:
private void onEvent(byte[] bytes) {
....
switch (event.getType()) {
...
case OFFERS:
try {
event.getOffers().getOffersList().stream()
.filter(of -> {
if (SchedulerService.filterAndAddAttrSys(of, attrMap)) {
return true;
}
declines.add(of.getId());
return false;
})
.forEach(
of -> {
List<TaskInfo> tasks = newArrayList();
String offerId = of.getId().getValue();
try {
SchedulerService.handleOffers(killMap, support, of, attrMap.get(offerId), declines, tasks);
} catch (Exception e) {
declines.add(of.getId());
tasks.forEach(
t -> {
AuxiliaryService.getTaskErrors()
.push(new TaskResult(com.hujiang.juice.common.model.Task.splitTaskNameId(t.getTaskId().getValue())
, ERROR, "task failed due to exception!"));
}
);
tasks.clear();
}
if (tasks.size() > 0) {
AuxiliaryService.acceptOffer(protocol, streamId, of.getId(), frameworkId, tasks, getUrl());
}
}
);
if (declines.size() > 0) {
AuxiliaryService.declineOffer(protocol, streamId, frameworkId, SchedulerCalls.decline(frameworkId, declines), getUrl());
}
long end = System.currentTimeMillis();
} finally {
declines.clear();
attrMap.clear();
}
break;
...
}
}
该段代码是分配Offer-tasks的核心代码,来看几个方法:
1.SchedulerService.filterAndAddAttrSys(),该方法作用是过滤不符合的OFFER,我们知道在Mesos的Agent中是可以通过配置Attr来使一些机器跑特殊的任务,而这里的过滤正是基于该特性,比如我们设置了该Juice-Service只使用包含以下Attr属性的资源时(在配置文件application.properties中)
mesos.framework.attr=lms,qa,mid|big
经过了SchedulerService.filterAndAddAttrSys()方法的过滤,符合以上attr的资源会被选取执行任务。同时不符合的Offer会加入declines List,通过AuxiliaryServic.declineOffer()一次性发送给Master告知忽略。
Agent的attr设置通过/etc/mesos-slave/attributes来设置。这个文件通常为这样的:
cat /etc/mesos-slave/attributes
bz:xx;
env:xx;
size:xx;
rack_id:xx;
dc:xx
2.SchedulerService.handleOffers(),该方法实现了原先MesosFramework中的resourceOffer的功能,对Offer进行Tasks分配,最后产生TaskInfo List,由AuxiliaryService.acceptOffer()发送给Master通知处理任务。
注意:Master在发送完Offer事件通知后会一直处于wait状态,直到Framework端调用Accept call(AuxiliaryService.acceptOffer())或Decline call(AuxiliaryServic.declineOffer())来告知Master资源是否使用后才会通知下一个Framework去分配资源。(默认Master会一直等待,如果没有通知,则Mesos集群中的资源利用率将可能达到100%,可以通过在Master端设置Timeout来避免这个问题。)
在Juice-Service内部,当SchedulerDriver与Master产生交互后,Juice-Service的处理逻辑由SchedulerService以及AuxiliaryService来实现。
SchedulerService处理Juice的主要逻辑,比如资源分配算法、任务优先级算法,所有Master回调事件处理方法都定义在SchedulerService中。
AuxiliaryService维护几组线程池,完成各自任务,刚才看到的AuxiliaryService.acceptOffer()和AuxiliaryServic.declineOffer(),都是通过调用AuxiliaryServic中的send-pool去完成call的发送,另外还有一些管理类的任务(比如实时查询任务状态、终止正在运行的任务等等)通过auxiliary-pool去完成。所以,AuxiliaryServic的调用都是异步的。
Juice中各种队列的功能介绍
刚才介绍了Juice的任务在JuiceRest提交时是被放入了一个MQ中,这个MQ在Juice-Service中被称为juice.task.queue。除此之外,还有另外几个MQ,分别是juice.task.retry.queue、juice.task.result.queue、juice.management.queue。下面来分别说说这些Queue的用处。
juice.task.retry.queue:Juice-Service在取任务时是按照每一个Offer轮询分配的,当一个Offer在分配资源时,假如从MQ中R-POP出来的任务不满足该Offer时(比如need-resources大于该Offer的max offer value时,或者存在constrains,当前的offer和指定执行任务的offer不match时),这时,Juice-Service的做法是将当前任务放入juice.task.retry.queue中,等待下一次Offer分配时,优先从juice.task.retry.queue获取任务并分配,这里涉及到Juice内部获取任务Queue的优先级,我用了一个比较简单的方式,即每次分配一个新的Offer资源时,先从juice.task.retry.queue中取出一定数目的任务(CACHE_TRIES = 5),当还有剩余资源时,则从juice.task.queue中取任务,直到撑满这个Offer。另外,处于juice.task.retry.queue会有淘汰机制,目前的任务淘汰机制遵循2点,当先触发以下某一项时,则该任务会认为失败,任务的Task_status被设置为Task_Failed,放入juice.task.result.queue,任务的淘汰算法如下:
1.过期时间淘汰制,任务处于juice.task.result.queue的时长>TASK_RETRY_EXPIRE_TIME,则淘汰(DEFAULT_TASK_RETRY_EXPIRE_TIME = 86400秒)。
2.大于最大检索次数,任务被取出检索但没有被执行达到最大检索次数>MAX_RESERVED,则淘汰(DEFAULT_MAX_RESERVED = 1024)。
juice.task.result.queue:任务结果队列,Juice-Service在得到一个任务的状态后(不一定是最终状态),将任务的TaskResult对象放入juice.task.result.queue,Juice-Rest端从该队列取出TaskResult,如果已经是任务的最终状态,比如Task_Finished或者Task_Failed,则通过外部在提交任务时所填写的callbackUrl回调调用方告知任务状态。
juice.management.queue:管理类队列,支持放入Reconcile类或Kill类的任务,由AuxiliaryService发起任务的查询同步或Kill一个正在执行的任务。
通过SDK提交一个任务###
目前开源的Juice版本,已经提供了完整的SDK来完成对于Juice-Rest之间的交互,以下是提交一个docker任务的示例:
总结及未来
目前Juice 1.1.0开源版本已经处于测试阶段,新版本除修复一些Bug之外,还增加了2个新功能:
1.增加了任务插队功能,可以通过在传入参数中设置priority=1来提高一个任务的执行优先级,该任务会被置于处理队列的最前端。
2.任务失败自动重试功能,设置传入参数retry=1,任务失败会自动重试,最多重试3次。
面对复杂的业务需求,Juice目前的版本还有一些特性/功能不支持,对于此,最好的方式是请大家Fork这个项目的Git,或直接联系本人,大家一起来把Juice做好。
@Test
public void submitsDocker() {
Submits submitsDocker = Submits.create()
.setDockerImage("dockerhub.XXXX.com/demo-slice")
.setTaskName("demo-slice")
.addArgs("/10002/res/L2.mp4")
.addEnv("environment", "dev")
.addResources(2.0, 2048.0);
Long taskId = JuiceClient.create("http://your-juice-rest-host/v1/tasks", "your-system-id-in-string")
.setOperations(submitsDocker)
.handle();
if(null != taskId) {
System.out.println("submitsDocker, taskId --> " + taskId);
}
}
Q&A:
Q.juice与elastic job的差异
A.我本身对于elastic job并不算太熟悉,就随便说几点,如果有错还请各位纠正:
首先juice与elastic-job-cloud都基于mesos,资源-任务分配这块elastic-job用了Fenzo(netflix),而juice是自己开发的调度算法。
juice在作业调用时不需要作业注册,只要上传任务的镜像(Docker)到仓库及任务触发。而elastic-job需要注册作业。
juice在Rest-Api接口上近乎完全和marathon一致,方便一些使用惯marathon部署service的用户。
juice目前版本并不支持作业分片。
Q.能详细介绍下任务资源分配这一块的算法吗?
A.之前已经简单介绍过了,通过接收'OFFERS'事件触发相关任务-资源分配的代码块。
由于得到的Offer对象实际为一个列表,处理逻辑会循环为每一个Offer分配具体的任务,而每个Offer的任务列表总资源(CPU,Memory等)必需小于Offer resources * RESOURCES_USE_THRESHOLD(资源使用阀值,可通过配置文件resources.use.threshold设置,默认0.8),每分配完一个Offer的task_infos后,便生成Accept Call由发送线程池进行发送处理,整个过程都是异步非阻塞的。
Q.所有的任务都存档在docker里面对于一些临时的任务如何处理?
A.临时的任务确实会产生一些垃圾的镜像,需要定期对Docker仓库进行清理,一般设置清理周期为1个月。
Q.任务系统是是否有帮助用户完成docker封装的操作?
A.目前没有,所以使用者必需会一些Docker的基本操作,至少要会打镜像,提交镜像等。当然,像一些Docker的设置,比如挂载volume,网络(bridge、host)等可以在提交任务时通过参数设置。
Q.Mesos和kubernetes的优劣势是什么?
A.其实我主要使用Mesos,Mesos相对K8S应该是一套更重的系统,Mesos更像是个分布式操作系统,而K8S在容器编排方面更有优势(Pod之类)。
网友评论