Ray源码解析之调度部分
总阅读次
Ray Version 0.4.0
调度部分包括六个文件:
- 调度策略:头文件
scheduling_policy.h
(定义SchedulingPolicy)和cpp文件scheduling_policy.cc
(为SchedulingPolicy实现调度策略)。 - 调度队列:头文件
scheduling_queue.h
()和cpp文件scheduling_queue.cc
()。 - 调度资源:头文件
scheduling_resources.h
()和cpp文件scheduling_resources.cc
()。
1 | src/ray/raylet/scheduling_policy.h |
调度策略的头文件,描述了SchedulingPolicy类的结构,包括构造函数,析构函数,Schedule函数(关键调度决策)。
构造函数传入需要调度的任务队列,Schedule函数接受集群资源的一个unordered_map
作为输入,输出为调度决策,也是一个unordered_map
。unordered_map
是C++ 11的新特性,其内部元素是无序的。
Schedule接受的输入map是以ClientID即本节点id为key,SchedulingResources调度资源为value,即一个节点->资源
的映射。
Schedule输出map则是以TaskID为键,ClientID为值,即表示TaskID表示的Task调度到ClientID代表的节点上。
1 | scheduling_policy.cc |
为简洁起见,这里之贴出了Schedule函数的实现。
从上述代码可以看到,ray的调度包括这么几个过程:
1)得到本地节点id
2)得到本地节点可提供的资源
3)对于每个准备好的任务,判断本地资源是否能满足该任务(task_feasible),能满足则调度到本地节点。
【问】那么不能满足的任务呢?这些任务没有对应的local_node_id
,在decision中也就没有key,这部分任务该怎么办,有待后面分析。
【问】还有一个问题就是,程序中本地节点能满足资源要求就调度到本地节点,调度后并不会减少resource_supply
即资源的供给,那么如果本地节点能满足所有任务的要求,岂不是所有任务都调度到此节点?
这里resource_demand.IsSubset(resource_supply_set)
中的IsSubset
表示资源需求是否是资源供给集的子集,如果是,表示满足条件。
1 | scheduling_queue.h |
调度队列定义头文件,封装了调度队列,每个队列包含着各自类型的任务。
(1) waiting: for object dependencies to become available,
(2) ready: object dependencies are available and the task is ready to be scheduled
(3) scheduled: the task has been scheduled but is waiting for a worker
(4) running: the task has been scheduled and is running on a worker.
scheduling_queue.cc
文件中则实现了这些队列的getter方法,以及进队列方法,移除方法。
比如说移除方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// Helper function to remove tasks in the given set of task_ids from a
// queue, and append them to the given vector removed_tasks.
void removeTasksFromQueue(std::list<Task> &queue,
std::unordered_set<TaskID, UniqueIDHasher> &task_ids,
std::vector<Task> &removed_tasks) {
for (auto it = queue.begin(); it != queue.end();) {
auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
if (task_id != task_ids.end()) {
task_ids.erase(task_id);
removed_tasks.push_back(std::move(*it));
it = queue.erase(it);
} else {
it++;
}
}
}
从queue中移除task_ids中包含的所有taskID代表的task。
1 | scheduling_resources.h |
调度资源定义中包含两个类,一个是资源集(ResourceSet),资源集维护了一个资源名->容量
的map映射resource_capacity_
,并实现了相等,资源子集和超集,添加和删除资源等接口。
另一个是调度资源类(SchedulingResources),维护了两个资源集,一个是resources_total_
总的静态资源配置,描述集群总共有哪些资源,以及一个resources_available_
可用资源集,每个时刻的可用资源量是动态变化的,所以也叫动态资源集。实现了检查资源能够满足,请求现在可用的资源,申请(Acquire)和释放(Release)等接口。
申请(Acquire)和释放(Release)接口会分别调用资源集的AddResources,RemoveResources接口。
在scheduling_resources.cc
中,资源集的相等是通过判断是否互为子集来实现的:1
2
3bool ResourceSet::operator==(const ResourceSet &rhs) const {
return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}
资源集是否为另一资源集的子集的判断函数如下实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17bool ResourceSet::IsSubset(const ResourceSet &other) const {
// Check to make sure all keys of this are in other.
for (const auto &resource_pair : resource_capacity_) {
const auto &resource_name = resource_pair.first;
const double lhs_quantity = resource_pair.second;
double rhs_quantity = 0;
if (!other.GetResource(resource_name, &rhs_quantity)) {
// Resource not found in rhs, therefore lhs is not a subset of rhs.
return false;
}
if (lhs_quantity > rhs_quantity) {
// Resource found in rhs, but lhs capacity exceeds rhs capacity.
return false;
}
}
return true;
}
看完后面4个文件再回过头去看调度策略的实现就明了多了。