文章目录

Ray Version 0.4.0

task头文件描述了Task的数据结构。
Task代表了一个Ray的任务及其规格,包括执行时获得的可变的规格和提交时就确定的不可变的规格。分别由TaskExecutionSpecificationTaskSpecification表示。初始化Task需提供这两种规格。Task中有获得这两种规格的函数。
Task同时维护了任务的对象依赖,包括不可变的任务参数以及可变的执行时依赖。也实现了getter。Task还包含一个判断task是否依赖于某对象的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
src/ray/raylet/task.h
----------------------------------

namespace ray {

/// A Task represents a Ray task and a specification of its execution (e.g.,
/// resource demands). The task's specification contains both immutable fields,
/// determined at submission time, and mutable fields, determined at execution
/// time.
class Task {
public:
/// Create a task.
///
/// \param execution_spec The execution specification for the task. These are
/// the mutable fields in the task specification that may change at task
/// execution time.
/// \param task_spec The immutable specification for the task. These fields
/// are determined at task submission time.
Task(const TaskExecutionSpecification &execution_spec,
const TaskSpecification &task_spec)
: task_execution_spec_(execution_spec), task_spec_(task_spec) {}

/// Destroy the task.
virtual ~Task() {}

// 获取任务执行规格
const TaskExecutionSpecification &GetTaskExecutionSpec() const;

// 获取任务不可变的规格
const TaskSpecification &GetTaskSpecification() const;

/// Get the task's object dependencies. This comprises the immutable task
/// arguments and the mutable execution dependencies.
///
/// \return The object dependencies.
/// TODO(atumanov): consider returning a constant reference.
const std::vector<ObjectID> GetDependencies() const;

// 判断task是否依赖于某一个object
bool DependsOn(const ObjectID &object_id) const;

private:
/// Task execution specification, consisting of all dynamic/mutable
/// information about this task determined at execution time..
TaskExecutionSpecification task_execution_spec_;
/// Task specification object, consisting of immutable information about this
/// task determined at submission time. Includes resource demand, object
/// dependencies, etc.
TaskSpecification task_spec_;
};

} // namespace ray

分析task.cc之前先来看看两种规格的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
src/ray/raylet/task_execution_spec.h   # 执行时任务规格头文件 
----------------------
namespace ray {

/// The task execution specification encapsulates all mutable information about
/// the task. These fields may change at execution time, converse to the
/// TaskSpecification that is determined at submission time.
class TaskExecutionSpecification {
public:
TaskExecutionSpecification(const std::vector<ObjectID> &&execution_dependencies);

// execution_dependencies: 任务依赖
// spillback_count: 回溢次数,即由local scheduler像global scheduler推此任务的次数
TaskExecutionSpecification(const std::vector<ObjectID> &&execution_dependencies,
int spillback_count);

const std::vector<ObjectID> &ExecutionDependencies() const;

void SetExecutionDependencies(const std::vector<ObjectID> &dependencies);

int SpillbackCount() const;

/// Increment the spillback count for this task.
void IncrementSpillbackCount();

/// Get the task's last timestamp (ms).
/// \return The timestamp when this task was last received for scheduling.
int64_t LastTimeStamp() const;

void SetLastTimeStamp(int64_t new_timestamp);

private:
/// 任务所依赖的object id的列表
std::vector<ObjectID> execution_dependencies_;
/// The last time this task was received for scheduling.
int64_t last_timestamp_;
/// The number of times this task was spilled back by local schedulers.
int spillback_count_;
};

} // namespace ray

执行时任务规格包含的信息要比提交时任务规格(task_spec.h中定义)少得多,主要包含任务执行时依赖,回溢次数和最后时间戳三个变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
src/ray/raylet/task_spec.h
-----------------------------------
TaskID TaskId() const; // 本Task的id
UniqueID DriverId() const; // 表示task所属job
TaskID ParentTaskId() const; // 启动这个task的task的id
int64_t ParentCounter() const; // task的父task在这之前启动的task数量
FunctionID FunctionId() const; // task所执行的函数id
int64_t NumArgs() const; // 参数数量
int64_t NumReturns() const; // 返回值数量
bool ArgByRef(int64_t arg_index) const; //
int ArgIdCount(int64_t arg_index) const; // 第arg_index个参数有多少个object id
ObjectID ArgId(int64_t arg_index, int64_t id_index) const; // 第arg_index个参数的第id_index个object id是多少
const uint8_t *ArgVal(int64_t arg_index) const;
size_t ArgValLength(int64_t arg_index) const;
double GetRequiredResource(const std::string &resource_name) const; // 获取特定资源的需求
const ResourceSet GetRequiredResources() const; // 获取资源需求集

private:
/// Task specification constructor from a pointer.
TaskSpecification(const uint8_t *spec, size_t spec_size);
/// Get a pointer to the byte data.
const uint8_t *data() const;
/// Get the size in bytes of the task specification.
size_t size() const;

/// The task specification data.
std::vector<uint8_t> spec_;
源码阅读 | Source