文章目录

Ray Version 0.4.0

调度部分包括六个文件:

  • 调度策略:头文件scheduling_policy.h(定义SchedulingPolicy)和cpp文件scheduling_policy.cc(为SchedulingPolicy实现调度策略)。
  • 调度队列:头文件scheduling_queue.h()和cpp文件scheduling_queue.cc()。
  • 调度资源:头文件scheduling_resources.h()和cpp文件scheduling_resources.cc()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
src/ray/raylet/scheduling_policy.h
---------------------------------------------

namespace ray {

/// \brief Implements a scheduling policy for the node manager.
class SchedulingPolicy {
public:
/// \param scheduling_queue: reference to a scheduler queues object for access to
/// tasks.
SchedulingPolicy(const SchedulingQueue &scheduling_queue);

/// Perform a scheduling operation, given a set of cluster resources and
/// producing a mapping of tasks to node managers.
///
/// \param cluster_resources: a set of cluster resources representing
/// configured and current resource capacity on each node.
/// \return Scheduling decision, mapping tasks to node managers for placement.
std::unordered_map<TaskID, ClientID, UniqueIDHasher> Schedule(
const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
&cluster_resources);

virtual ~SchedulingPolicy();

private:
/// An immutable reference to the scheduling task queues.
const SchedulingQueue &scheduling_queue_;
};
}

调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。
构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个unordered_map作为输入,输出为调度决策,也是一个unordered_mapunordered_map是C++ 11的新特性,其内部元素是无序的。
Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个节点->资源的映射。
Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
scheduling_policy.cc
----------------------------------

namespace ray {
....
std::unordered_map<TaskID, ClientID, UniqueIDHasher> SchedulingPolicy::Schedule(
const std::unordered_map<ClientID, SchedulingResources, UniqueIDHasher>
&cluster_resources) {
static ClientID local_node_id = ClientID::nil();
std::unordered_map<TaskID, ClientID, UniqueIDHasher> decision;
// TODO(atumanov): consider all cluster resources.
SchedulingResources resource_supply = cluster_resources.at(local_node_id);
const auto &resource_supply_set = resource_supply.GetAvailableResources();

// Iterate over running tasks, get their resource demand and try to schedule.
for (const auto &t : scheduling_queue_.GetReadyTasks()) {
// Get task's resource demand
const auto &resource_demand = t.GetTaskSpecification().GetRequiredResources();
bool task_feasible = resource_demand.IsSubset(resource_supply_set);
if (task_feasible) {
const TaskID &task_id = t.GetTaskSpecification().TaskId();
decision[task_id] = local_node_id;
}
}
return decision;
}
....
}

为简洁起见,这里之贴出了Schedule函数的实现。
从上述代码可以看到,ray的调度包括这么几个过程:
1)得到本地节点id
2)得到本地节点可提供的资源
3)对于每个准备好的任务,判断本地资源是否能满足该任务(task_feasible),能满足则调度到本地节点。

【问】那么不能满足的任务呢?这些任务没有对应的local_node_id,在decision中也就没有key,这部分任务该怎么办,有待后面分析。

【问】还有一个问题就是,程序中本地节点能满足资源要求就调度到本地节点,调度后并不会减少resource_supply即资源的供给,那么如果本地节点能满足所有任务的要求,岂不是所有任务都调度到此节点?

这里resource_demand.IsSubset(resource_supply_set)中的IsSubset表示资源需求是否是资源供给集的子集,如果是,表示满足条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scheduling_queue.h
--------------------------------

namespace ray {

class SchedulingQueue {
public:
SchedulingQueue() {}

virtual ~SchedulingQueue() {}

const std::list<Task> &GetWaitingTasks() const;
const std::list<Task> &GetReadyTasks() const;
const std::list<Task> &GetReadyMethods() const;
const std::list<Task> &GetScheduledTasks() const;
const std::list<Task> &GetRunningTasks() const;

std::vector<Task> RemoveTasks(std::unordered_set<TaskID, UniqueIDHasher> tasks);


void QueueWaitingTasks(const std::vector<Task> &tasks);
void QueueReadyTasks(const std::vector<Task> &tasks);
void QueueScheduledTasks(const std::vector<Task> &tasks);
void QueueRunningTasks(const std::vector<Task> &tasks);

/// Register an actor.
///
/// \param actor_id The ID of the actor to register.
/// \param actor_information Information about the actor.
bool RegisterActor(ActorID actor_id, const ActorInformation &actor_information);

private:
std::list<Task> waiting_tasks_;
std::list<Task> ready_tasks_;
std::list<Task> scheduled_tasks_;
std::list<Task> running_tasks_;
/// The registry of known actors.
std::unordered_map<ActorID, ActorInformation, UniqueIDHasher> actor_registry_;
};
} // namespace ray

调度队列定义头文件,封装了调度队列,每个队列包含着各自类型的任务。

(1) waiting: for object dependencies to become available,
(2) ready: object dependencies are available and the task is ready to be scheduled
(3) scheduled: the task has been scheduled but is waiting for a worker
(4) running: the task has been scheduled and is running on a worker.

scheduling_queue.cc文件中则实现了这些队列的getter方法,以及进队列方法,移除方法。
比如说移除方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Helper function to remove tasks in the given set of task_ids from a
// queue, and append them to the given vector removed_tasks.
void removeTasksFromQueue(std::list<Task> &queue,
std::unordered_set<TaskID, UniqueIDHasher> &task_ids,
std::vector<Task> &removed_tasks)
{

for (auto it = queue.begin(); it != queue.end();) {
auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
if (task_id != task_ids.end()) {
task_ids.erase(task_id);
removed_tasks.push_back(std::move(*it));
it = queue.erase(it);
} else {
it++;
}
}
}

从queue中移除task_ids中包含的所有taskID代表的task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
scheduling_resources.h
------------------------------------

namespace ray {

/// Resource availability status reports whether the resource requirement is
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
typedef enum {
kInfeasible, ///< Cannot ever satisfy resource requirements.
kResourcesUnavailable, ///< Feasible, but not currently available.
kFeasible ///< Feasible and currently available.
} ResourceAvailabilityStatus;

class ResourceSet {
public:
ResourceSet();
ResourceSet(const std::unordered_map<std::string, double> &resource_map);
~ResourceSet();

bool operator==(const ResourceSet &rhs) const;

bool IsEqual(const ResourceSet &other) const;

bool IsSubset(const ResourceSet &other) const;

bool IsSuperset(const ResourceSet &other) const;

bool AddResource(const std::string &resource_name, double capacity);

bool RemoveResource(const std::string &resource_name);

bool AddResources(const ResourceSet &other);

bool SubtractResources(const ResourceSet &other);

// 返回指定资源的容量值(赋给value指向的值),如果资源有大于0的数量,且value不为空指针,则返回true
bool GetResource(const std::string &resource_name, double *value) const;

private:
// 资源容量map
std::unordered_map<std::string, double> resource_capacity_;
};

/// \class SchedulingResources
/// SchedulingResources 封装资源的状态和资源的计数。资源包括配置资源束容量和GPU分配图。
class SchedulingResources {
public:
SchedulingResources();
SchedulingResources(const ResourceSet &total);
~SchedulingResources();
// 检查一个资源集是否能被满足,有几种状态,(1) infeasible, (2) feasible but currently unavailable, or (3) available.
ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const;

// 请求现在可用的资源集,返回一个不可变的资源集合
const ResourceSet &GetAvailableResources() const;

bool Release(const ResourceSet &resources);

bool Acquire(const ResourceSet &resources);

private:
/// 静态资源配置
ResourceSet resources_total_;
/// 动态资源容量
ResourceSet resources_available_;
/// gpu_map - replace with ResourceMap (for generality).
};

} // namespace ray

调度资源定义中包含两个类,一个是资源集(ResourceSet),资源集维护了一个资源名->容量的map映射resource_capacity_,并实现了相等,资源子集和超集,添加和删除资源等接口。
另一个是调度资源类(SchedulingResources),维护了两个资源集,一个是resources_total_总的静态资源配置,描述集群总共有哪些资源,以及一个resources_available_可用资源集,每个时刻的可用资源量是动态变化的,所以也叫动态资源集。实现了检查资源能够满足,请求现在可用的资源,申请(Acquire)和释放(Release)等接口。
申请(Acquire)和释放(Release)接口会分别调用资源集的AddResources,RemoveResources接口。

scheduling_resources.cc中,资源集的相等是通过判断是否互为子集来实现的:

1
2
3
bool ResourceSet::operator==(const ResourceSet &rhs) const {
return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}

资源集是否为另一资源集的子集的判断函数如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bool ResourceSet::IsSubset(const ResourceSet &other) const {
// Check to make sure all keys of this are in other.
for (const auto &resource_pair : resource_capacity_) {
const auto &resource_name = resource_pair.first;
const double lhs_quantity = resource_pair.second;
double rhs_quantity = 0;
if (!other.GetResource(resource_name, &rhs_quantity)) {
// Resource not found in rhs, therefore lhs is not a subset of rhs.
return false;
}
if (lhs_quantity > rhs_quantity) {
// Resource found in rhs, but lhs capacity exceeds rhs capacity.
return false;
}
}
return true;
}

看完后面4个文件再回过头去看调度策略的实现就明了多了。

源码阅读 | Source