文章目录

Ray Version 0.4.0

以一个Ray示例程序来说明,Ray执行多进程/分布式程序的过程。

1
2
3
4
5
6
7
8
9
10
11
import ray
import time

@ray.remote
def f():
time.sleep(1)
return 1

ray.init()
results = ray.get([f.remote() for i in range(4)])
print(results)

结果输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
Process STDOUT and STDERR is being redirected to /tmp/raylogs/.
Waiting for redis server at 127.0.0.1:35084 to respond...
Waiting for redis server at 127.0.0.1:59058 to respond...
Starting local scheduler with the following resources: {'CPU': 8, 'GPU': 0}.

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui26399.ipynb?token=7e19e5b2051ef36474500c26427d304da95835e4f0933992
======================================================================

[1, 1, 1, 1]

Process finished with exit code 0

首先我们定义remote函数f,将一个普通的Python函数变为remote函数只需在其上加上@ray.remote装饰器。

remote是一个装饰工厂函数,返回修饰函数的装饰器,主要定义代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# ray/python/ray/worker.py
def remote(*args, **kwargs):

worker = global_worker

def make_remote_decorator(num_return_vals, num_cpus, num_gpus, resources,
max_calls, checkpoint_interval, func_id=None):
# 装饰器工厂函数

# 装饰器,装饰函数,做Actor和function的区分
def remote_decorator(func_or_class):
if inspect.isfunction(func_or_class) or is_cython(func_or_class):
...
return remote_function_decorator(..) # 是函数,调用远程函数装饰器
if inspect.isclass(func_or_class):
...
return worker.make_actor(..) # 是actor,由全局worker创建一个actor
raise Exception
# 装饰器,参数是函数,起装饰函数的作用
def remote_function_decorator(func, function_properties):
def func_call(*args, **kwargs):
return _submit(args=args, kwargs=kwargs)

def _submit(...):
...
def func_executor(arguments):
"""This gets run when the remote function is executed."""
result = func(*arguments)
return result

def func_invoker(*args, **kwargs):
raise Exception
func_invoker.remote = func_call # func.remote() 直接调用func_call
func_invoker._submit = _submit
func_invoker.executor = func_executor
func_invoker.is_remote = True
func_name = "{}.{}".format(func.__module__, func.__name__)
func_invoker.func_name = func_name
...
return func_invoker

return remote_decorator
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
# 不带参数的 @ray.remote 装饰
return make_remote_decorator(
num_return_vals, num_cpus, num_gpus, resources,
max_calls, checkpoint_interval)(args[0])
else:
# 带参数的 @ray.remote(xx=x) 装饰
...
return make_remote_decorator(num_return_vals, num_cpus, num_gpus,
resources, max_calls, checkpoint_interval)

【先验知识:Python装饰器的概念】
由此可见,remote是一个通用的装饰器,可以装饰普通的Python函数,或者是Python的class。

进入remote装饰器体,首先得到全局Worker,然后定义了一个make_remote_decorator装饰器工厂函数,然后判断是无参装饰还是带参装饰。
如果是无参装饰,那么

1
2
3
4
@ray.remote
def f():
time.sleep(1)
return 1

等价于

1
2
3
4
def f():
...

f = ray.remote(f)

此时,remote的参数只有一个,那就是f本身,也即args[0]
所以上述代码返回make_remote_decorator(...)(args[0]),即调用过的make_remote_decorator,参数是f

否则remote函数定义等价于:

1
2
3
4
def f():
...

f = ray.remote(num_cpus=1, ..)(f)

所以remote装饰器返回一个未调用的,将会在f上调用的make_remote_decorator函数。

make_remote_decorator中再嵌套了一层装饰,本身提供对函数和actor的区分。

如果是函数,则进入remote_function_decorator远程函数装饰器;
否则是class,由全局worker创建一个actor。

远程函数装饰器remote_function_decorator的责任就是接受函数参数,返回一个函数,这个函数就是远程函数,不能直接传参调用(第29行)。将remote()绑定到func_call,接受参数后,提交任务(_submit_task)运行这个函数,最后得到结果,这个结果也是f.remote()调用的结果,是一个object id,因为返回结果存在object store中。

至此,远程函数就定义好了。我们在原始的普通Python函数f上,装饰了一下,得到了一个可以通过f.remote()来调用的远程函数,如此调用将会立马提交一个任务,供Ray引擎调度执行,返回结果。

下面是ray.init()过程。可以理解为初始化Ray引擎的过程,类似于启动Tensorflow的Session的过程。

ray.init()也有带参版本和无参版本。
带参版本用于已经存在并启动一个Ray集群的情况下,直接填入该集群的redis地址,即可连接到集群,就初始化好了。
无参版本适用于单机多进程的运行,这种情况下会创建一个Ray环境,默认启动一个local scheduler,一个global scheduler,一个或多个redis server, 一个object store和一个object store manager,和若干worker进程(默认为CPU核数个)。

init()主要逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
# ray/python/ray/worker.py
init()
_init()
if PYTHON_MODE:
pass
elif start_ray_local: # 本地开启一个Ray主节点进程
address_info = services.start_ray_head(..)
else: # 连接到已有集群
address_info = get_address_info_from_redis(redis_address, node_ip_address)
# 将全局worker连接到 local scheduler, Plasma 和 Redis
connect(driver_address_info, object_id_seed=object_id_seed,
mode=driver_mode, worker=global_worker)

四个模式:

1
2
3
4
SCRIPT_MODE:如果Worker是driver,且由Python脚本启动或者在shell中交互式运行的话,使用脚本模式。会打印任务失败信息。
WORKER_MODE:如果Worker不是driver,只是slave的话,启动WORKER_MODE,不打印关于task的任何信息。
PYTHON_MODE:如果要顺序运行或是调试,可以使用PYTHON_MODE,此时的Worker即是driver。此模式下,不会发送remote函数到调度器,而是直接以阻塞的形式执行。
SILENT_MODE:测试的时候使用SILENT_MODE。不会打印error信息,因为许多测试时故意失败的。

我们的示例代码中,ray.init()是无参的,代表我们会在本地开启一个ray head节点进程。
此部分代码简要逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# ray/python/ray/services.py
start_ray_head
| start_ray_processes
| print("Process STDOUT and STDERR is being redirected to /tmp/raylogs/.") # 程序输出中第一行的来源
| if redis_address is None:
| start_redis(...)
| start_redis_instance(..)
| 创建redis_shards个redis server
# 等待redis server可用并响应,程序输出第2,3行的来源
| wait_for_redis_to_start("127.0.0.1", port)
| if include_log_monitor:
| start_log_monitor(..)
| if include_global_scheduler:
| start_global_scheduler(...)
# 开启local_scheduler并打印 Starting local scheduler ..,程序第4行的来源
| local_scheduler_name, pid = ray.local_scheduler.start_local_scheduler(...)
| for i in range(num_local_schedulers - len(object_store_addresses)):
| start_objstore(...)
| ray.plasma.start_plasma_store(..)
| ray.plasma.start_plasma_manager(..)
| for i in range(len(local_scheduler_socket_names), num_local_schedulers):
| start_local_scheduler(...)
# 每个local scheduler默认搭配CPU核数个workers,因此workers_per_local_scheduler[i] = #cpus
| for i, num_local_scheduler_workers in enumerate(workers_per_local_scheduler):
| for j in range(num_local_scheduler_workers):
| start_worker(...)
| if include_webui:
| start_ui(...)
# 开启UI会打印输出中UI的部分

可以看到,start_ray_head的过程配套启动了redis, global scheduler, local scheduler及其workers,UI等。
这些都是ray执行快速的分布式任务分发的基本组件,其中redis用来存储全局系统状态,global scheduler和local scheduler分数两级调度器,负责快速的任务调度,workers负责执行远程函数,UI负责观察运行状态,不过目前UI做的还比较简陋。

每个Worker执行一个主循环main_loop,循环不断地接受任务,处理任务返回……
这部分代码见ray/python/ray/workers/default_worker.py
main_loop的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ray/python/ray/worker.py
def main_loop(self):
def exit(signum, frame):
cleanup(worker=self)
sys.exit(0)
signal.signal(signal.SIGTERM, exit)
check_main_thread()
while True:
# 此处调用self.local_scheduler_client.get_task()获得任务
task = self._get_next_task_from_local_scheduler()
self._wait_for_and_process_task(task)
| self._wait_for_function(function_id, task.driver_id().id())
| with self.lock:
| self._process_task(task)

初始化好以后,就可以运行f.remote()了,运行后还是回到装饰器里面的_submit_task函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ray/python/ray/worker.py
def _submit(args=None, kwargs=None, num_return_vals=None,
num_cpus=None, num_gpus=None, resources=None):
check_connected() # 检查worker是否连接
check_main_thread() # 检查是否主线程,不允许非主线程提交任务
kwargs = {} if kwargs is None else kwargs
args = signature.extend_args(function_signature, args, kwargs)

if _mode() == PYTHON_MODE:
# PYTHON模式下,并不提交任务,而是串行执行,拷贝参数以防修改
result = func(*copy.deepcopy(args))
return result
# 提交任务,返回结果的object id或者一组object ids
object_ids = _submit_task(function_id, args,
num_return_vals=num_return_vals,
num_cpus=num_cpus, num_gpus=num_gpus,
resources=resources)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids

代码中调用的_submit_task是对worker.submit_task的一个封装:

1
2
3
4
5
6
7
8
9
10
# ray/python/ray/worker.py
def _submit_task(function_id, *args, **kwargs):
"""This is a wrapper around worker.submit_task.

We use this wrapper so that in the remote decorator, we can call
_submit_task instead of worker.submit_task. The difference is that when we
attempt to serialize remote functions, we don't attempt to serialize the
worker object, which cannot be serialized. 【这样搞一下就不需要序列化worker对象了?】
"""

return global_worker.submit_task(function_id, *args, **kwargs)

最终,Worker的submit_task函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# ray/python/ray/worker.py
def submit_task(self, function_id, args, ...):
with log_span("ray:submit_task", worker=self):
check_main_thread()
...
# 将参数put进object store,注意,如果多个函数使用的是相同的输入,直接调用的话仍然会put多次
# 一个方法是先在调用前put参数,然后传入put后的ObjectID对象。
args_for_local_scheduler = []
for arg in args:
if isinstance(arg, ray.local_scheduler.ObjectID):
args_for_local_scheduler.append(arg)
elif isinstance(arg, ray.actor.ActorHandleParent):
args_for_local_scheduler.append(put(
ray.actor.wrap_actor_handle(arg)))
elif ray.local_scheduler.check_simple_value(arg):
args_for_local_scheduler.append(arg)
else:
args_for_local_scheduler.append(put(arg))
...
# Submit the task to local scheduler.
task = ray.local_scheduler.Task(
self.task_driver_id,
ray.local_scheduler.ObjectID(function_id.id()),
args_for_local_scheduler,
...)
...
self.task_index += 1
self.local_scheduler_client.submit(task)

return task.returns()

也就是说,[f.remote() for i in range(4)]这一句,默认的全局worker会提交4个任务给local scheduler,然后local scheduler将这些任务调度到嗷嗷待哺的各个worker,前面的代码说过,默认会启动CPU核数个Worker。

运行完毕后,列表中就是返回的值的object id,我们需要使用ray.get(id)从object store中将真正的数据拿出来。

最后就成了[1, 1, 1, 1],程序到此就结束了。

再回顾一下整个过程:

1
2
3
4
5
6
7
8
@ray.remote   # 装饰器
def f():
time.sleep(1)
return 1
# 装饰完成,装饰过后的远程函数f已形成
ray.init() # 初始化Ray引擎,会启动各个必要组件,包括调度,状态存储,对象存储和workers等
results = ray.get([f.remote() for i in range(4)]) # 提交任务,获得结果,从object store中取出
print(results)

Happy Reading!

源码阅读 | Source