一。JobVertex
在jobgraph中有一个组成“元素”:JobVertex是不得不提的:jobvertex用于产生intermediate dataset,并通过jobedge串联不同的jobvertex同时也是将operator chain的"关键点"。 jobvertex是从job层面对task进行抽象。
二。源码
第一部分:属性或字段
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
// 当前jobvertex id,与之关联的对应类:JobVertexID
// 关于jobvertex id产生:
//
/** The ID of the vertex. */
private final JobVertexID id;
// 候选vertex id
/** The alternative IDs of the vertex. */
private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
// jobvertex关联的operator
/** The IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
// 候选operatorid
/** The alternative IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
// 当前jobvertex产出的临时中间数据集:IntermediateDataSets
/** List of produced data sets, one per writer */
private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
// 当前jobvertex提供给下游jobvertex读取的通道:一个下游读取vertex对应一个reader关联一个jobedge
/** List of edges with incoming data. One per Reader. */
private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
// 该task在runtime时被分割的subtask
/** Number of subtasks to split this task into at runtime.*/
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
// 当前task在runtime时被分割成subtask的最小任务
/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = -1;
// 当前jobvertex需要最小程度的资源
/** The minimum resource of the vertex */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
// 当前jobvertex采用的最优资源
/** The preferred resource of the vertex */
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
// 在runtime期间分配给当前task的配置信息
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
// 当前task执行invoke的class
/** The class of the invokable. */
private String invokableClassName;
// 标示当前jobvertex是否停止
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
// input通过format被split
/** Optionally, a source of input splits */
private InputSplitSource<?> inputSplitSource;
// 当前jobvertex的name
/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
private String name;
// 通过定义sharing group来保证来自不同jobvertex能够并发运行在一个slot里面
// 特别是使用coLocation时 是需要对应的task属于同于一个sharding group
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
private SlotSharingGroup slotSharingGroup;
// jobvertex的子任务共享slot的组,operator chain
/** The group inside which the vertex subtasks share slots */
private CoLocationGroup coLocationGroup;
// 如下参数都是被记录在json plan中的
// operator name
/** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */
private String operatorName;
// 针对当前jobvertex的描述
/** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
* to be included in the JSON plan */
private String operatorDescription;
/** Optional, pretty name of the operator, to be displayed in the JSON plan */
private String operatorPrettyName;
// 主要记录针对operator进行优化的property,会被写入到json plan中
/** Optional, the JSON for the optimizer properties of the operator result,
* to be included in the JSON plan */
private String resultOptimizerProperties;
// 调度这个jobvertex时,依赖输入限制策略:
// 1。ANY:只要上游的input相关的subtask有完成的 即可开启当前jobvertex的subtask处理
// 2。ALL:必须等待上游的input相关subtasks全部完成,才会启动jobvertex的subtask来获取数据
// 一般来说在指定一致性语义时需要注意这两种策略
/** The input dependency constraint to schedule this vertex. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
第二部分:方法或函数
- 被执行的具体task
/**
* Returns the invokable class which represents the task of this vertex
*
* @param cl The classloader used to resolve user-defined classes
* @return The invokable class, <code>null</code> if it is not set
*/
// 用来指定当前jobvertex对应具体的task
// 比如DataSinkTask/BatchTask等 会被执行的task
public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
if (cl == null) {
throw new NullPointerException("The classloader must not be null.");
}
if (invokableClassName == null) {
return null;
}
try {
return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("The user-code class could not be resolved.", e);
}
catch (ClassCastException e) {
throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
}
}
2.跟operator chain相关的内容
/**
* Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
* slot sharing group can run one subtask each in the same slot.
*
* @param grp The slot sharing group to associate the vertex with.
*/
// 将该jobvertex通过slotgroup组合起来便于schedule,
// 只要在相同的slot group隶属不同的job vertex能够彼此在同一个slot被调用
public void setSlotSharingGroup(SlotSharingGroup grp) {
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(id);
}
this.slotSharingGroup = grp;
if (grp != null) {
grp.addVertexToGroup(id);
}
}
/**
* Gets the slot sharing group that this vertex is associated with. Different vertices in the same
* slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
* a slot sharing group, this method returns {@code null}.
*
* @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
*/
// 当一个jobvertex未被分配slotgroup时 与该jobvertex是没有关联的slotgroup,直接返回null
// 默认情况下slotgroup名字:default
@Nullable
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
/**
* Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
* Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
* instance (TaskManager) as the n'th subtask of the given vertex.
*
* NOTE: Co-location is only possible between vertices in a slot sharing group.
*
* NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
* respective vertex must be a (transitive) input of this vertex.
*
* @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
*
* @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
* slot sharing group.
*
* @see #setSlotSharingGroup(SlotSharingGroup)
*/
// 将当前jobvertex与给定的jobvertex的subtasks关联在一起
// 也就意味着比如当前jobvertex的第n个subtask和给定的jobvertex第n任务在相同tm上被同一个slot执行
// 需要一些限制条件:
// 1。需要当前的jobvertex和给定jobvertex隶属同一个slotgroup
// 2。当前jobvertex的input是依赖与之关联jobvertex,是可传递的,forward
// 在进行operator chain时会通过这种方式来进行处理
public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
}
CoLocationGroup thisGroup = this.coLocationGroup;
CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
// 首先保证两个jobvertex隶属同一slotgroup
if (otherGroup == null) {
if (thisGroup == null) { // 两个jobvertex不隶属任何slotgroup;直接构建一个colocationgroup将jobvertex关联在一起
CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
this.coLocationGroup = group;
strictlyCoLocatedWith.coLocationGroup = group;
}
else { // 指定的jobvertex没有对应的slotgroup,直接使用当前的jobvertex所在的slotgroup
thisGroup.addVertex(strictlyCoLocatedWith);
strictlyCoLocatedWith.coLocationGroup = thisGroup;
}
}
else {
if (thisGroup == null) { // 当前的jobvertex不存在对应的slotgroup,使用给定的jobvertex的slotgroup
otherGroup.addVertex(this);
this.coLocationGroup = otherGroup;
}
else { // 两个jobvertex具备不同的slotgroup 需要进行合并
// both had yet distinct groups, we need to merge them
thisGroup.mergeInto(otherGroup);
}
}
}
- ResultPartitionType
表示jobvertex产生intermediate dataset后以什么方式进行partition
/**
* Blocking partitions represent blocking data exchanges, where the data stream is first
* fully produced and then consumed. This is an option that is only applicable to bounded
* streams and can be used in bounded stream runtime and recovery.
*
* <p>Blocking partitions can be consumed multiple times and concurrently.
*
* <p>The partition is not automatically released after being consumed (like for example the
* {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
* that the partition is no longer needed.
*/
// 以block方式传输数据
// 需要当前的数据要全部生产完成,方可消费
// 该partition方式常用于bound stream,既能用于正常的数据处理又可以用于故障恢复
// 需要注意一点:block产生的partition可以被重复消费
BLOCKING(false, false, false, false),
/**
* BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have
* a user-specified life cycle.
*
* <p>BLOCKING_PERSISTENT partitions are dropped upon explicit API calls to the
* JobManager or ResourceManager, rather than by the scheduler.
*
* <p>Otherwise, the partition may only be dropped by safety-nets during failure handling
* scenarios, like when the TaskManager exits or when the TaskManager looses connection
* to JobManager / ResourceManager for too long.
*/
// 类似block partition方式,不过用户可以指定生命周期
// 针对BLOCKING_PERSISTENT partition被清理,只能由jobmanager或resourcemanager来完成,不能通过scheduler来完成
// 不过出现tm退出或者tm与jobmanager间失联时间过长,此时BLOCKING_PERSISTENT partition只能在失败处理期间通过safety-nets来清理
BLOCKING_PERSISTENT(false, false, false, true),
/**
* A pipelined streaming data exchange. This is applicable to both bounded and unbounded streams.
*
* <p>Pipelined results can be consumed only once by a single consumer and are automatically
* disposed when the stream has been consumed.
*
* <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
* the {@link #PIPELINED_BOUNDED} variant.
*/
// 数据以pipelined的方式进行传输,能够支持流和批处理
// 针对pipelined产生的partition只能被消费一次并且是一个consumer;
// 一旦pipelined的partition被消费过 将会自动被丢弃
PIPELINED(true, true, false, false),
/**
* Pipelined partitions with a bounded (local) buffer pool.
*
* <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
* data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
* overall network buffer pool size, this, however, still allows to be flexible with regards
* to the total number of partitions by selecting an appropriately big network buffer pool size.
*
* <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
* no checkpoint barriers.
*/
// 类似Pipelined partition,不过它还附带了一个有界的本地buffer pool
// PIPELINED_BOUNDED用于stream job时通过指定固定大小的buffer pool
// 方面一。能够将数据固定大小的数据进行buffer,这样也可以使得checkpoint对齐不会被延迟太久,整体数据处理吞吐量也会提升;
// 另一方面,由于使用的是固定大小的buffer pool将数据buffer,能够相对较好调整及时性和吞吐量两者的平衡
// 不过该partition方式的本地buffer pool不同于network buffer pool大小的限制,该方式能通过选择适当的大的网络缓冲池大小来灵活地控制分区的总数
// 针对batch job来说由于不存在checkpoint对齐的过程,是没有限制的
PIPELINED_BOUNDED(true, true, true, false);
三。源码
JobVertex.java源码
网友评论