ray -- Python Distributed Computing Framework
文章目录
教程
快速入门:
基准测试
python 单进程、multiprocessing 和 Ray 性能比较
- cpu 核数越多 Ray 的优势越明显
注意事项
- 参考:Tips for first-time users — Ray v2.0.0.dev0
条目
避免小任务使用 ray task
- ray task 本身的消耗就要 0.5ms
合理做法
- 因此,task 要在 几毫秒以上
(>=3ms)时使用 ray task 才合适
- 因此,task 要在 几毫秒以上
避免传递重复传递对象 object(特别是大对象)
因为,在
ray_fun.remote(object)调用时,自动使用 ray.put- 这种重复操作时间消耗巨大
合理做法
- 先用 obj_id = ray.put(ojbect), 存储 object
- ray_fun.remote(obj_id), ray 函数调用时,直接使用 ObjectRef 对象
Pipeline, 流程化任务
- 不要等处理完成所有任务后,再做下一步处理
合理做法
- 处理完成一个,传入下一步一个
例子
参考:Tips for first-time users — Ray v2.0.0.dev0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22import time import random import ray ray.init(num_cpus = 4) @ray.remote def do_some_work(x): time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do. return x def process_incremental(sum, result): time.sleep(1) # Replace this with some processing code. return sum + result start = time.time() result_ids = [do_some_work.remote(x) for x in range(4)] sum = 0 while len(result_ids): done_id, result_ids = ray.wait(result_ids) sum = process_incremental(sum, ray.get(done_id[0])) print("duration =", time.time() - start, "\nresult = ", sum)
子 worker: Module Not Found 找不到模块问题
解决方法
在主进程中修改 PYTHONPATH 环境变量
注意
- 在 ray.init 之前
例子
1 2 3 4 5import os os.environ['PYTHONPATH'] = '/mnt/d/source/cropper/' import ray ray.init(ignore_reinit_error=True)
task 和 actor
@ray.remote
- 修饰函数,函数 –> 变成 –> task
- 修饰类, 类 –> 变成 –> actor
@ray.remote
- 修饰以可以远程调用
修饰函数
代码
1 2 3 4 5 6 7 8 9 10 11 12import ray ray.init() @ray.remote def f(x): return x * x # 共享内存地址获取 futures = [f.remote(i) for i in range(4)] # 取回结果 -> list 对象 print(ray.get(futures)) # [0, 1, 4, 9]作用
- 修饰编程 ray.remote_function.RemoteFunction 类对象
使用:
fun.remote(*args, **kwargs)- our_fun_name.remote(调用参数, …)
特征
fun.remote()
返回
- 立刻返回一个 future(
ObjectRef对象), 对运行结果的引用
- 立刻返回一个 future(
任务
- 返回后,在后台运行任务 task
修饰类
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25import ray ray.init() # Only call this once. @ray.remote class Counter(object): def __init__(self): self.n = 0 def increment(self): self.n += 1 def read(self): return self.n # 创建对象 counters = [Counter.remote() for i in range(4)] # 调用对象的方法 [c.increment.remote() for c in counters] # 获取共享内存地址 futures = [c.read.remote() for c in counters] # ***注意: 结果在共享内存 print(ray.get(futures)) # [1, 1, 1, 1]作用
- 修饰类,以可以远程调用
使用
- method_name.remote(*args, **kwargs)
对象共享
底层工具
- Apache Arrow data layout 数据布局
- Plasma store zero-copy serialization 零拷贝
- 参考:Fast Python Serialization with Ray and Apache Arrow | Ray: A fast and simple …
总之
- 相比于 mutiprocessing 使用的 pickle 序列化 + 跨进程对象传递,速度快很多
- pyarrow 调用 Apache Arrow
ray.put
作用
存储对象到共享内存
/dev/shm中- 已经存储过的对象,不会存储再存储一次
- 返回引用地址 ObjectRef 对象
ray.get
作用
- 获取对象,使用地址 ObjectRef
参数
object_refs
- 单个 ObjectRef 或 List[ObjectRef]
timeout
- 超时
返回值
一个 Python 对象
- 单个值
- 或者 一个 Python 列表
调用
fun.remote()
- 可以直接接受 ObjectRef 对象,内部会自动转换成 Python 对象
- 不必在 fun 内部调用
ray.get
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16import numpy as np @ray.remote def create_matrix(size): return np.random.normal(size=size) @ray.remote def multiply_matrices(x, y): return np.dot(x, y) x_id = create_matrix.remote([1000, 1000]) y_id = create_matrix.remote([1000, 1000]) z_id = multiply_matrices.remote(x_id, y_id) # Get the results. z = ray.get(z_id)
任务依赖 Task dependencies
解说
- 当 fun2.remote() 依赖于 fun1.remote() 的调用结果时,形成依赖
- fun2() 会在 fun1() 运行完成后再运行
注意
- 传入 remote 函数(task)内部时 RefObject 自动被替换成 PyObject 原对象
但是 ,在非函数传参时
- RefObject 被使用时,不会自动被替换成 PyObject
- 必须手动 ray.get() 转换
代码例子
| |
测试证明
reduce 简单操作,使用
Ray速度更慢- 原因分析:Ray 需要在
不同进程间做对象拷贝,消耗太高
- 原因分析:Ray 需要在
结论
- 小对象,简单操作,不适合 Ray 并行运算
启动(cluster) 和 连接(client)
调用工具
ray.init
位置参数
address: str
- 解说:cluster ip 地址
取值
- ip
'auto'
自动探测机器地址
前提条件:当前机器位于 ray cluster
None自动启动一个 ray 相关程序
相关程序
- Redis
- raylet
- plasma store
- plasma manager
- workers
- 脚本退出,自动关闭 ray 相关程序
关键字参数
num_cpu: int
- 默认值:逻辑 cpu 数量(包括物理线程数)
- num_gpu
启动
python 程序启动
ray.init- 不加 address 位置参数
命令行启动
1ray start --head --port=6379
连接
ray.init
- 加上 位置参数 address
单机避免重复启动
步骤
- 命令行启动
ray start ... - python 调用
ray.init('auto', ...)
- 命令行启动
资源状况获取
方法
- ray.available_resources()
- ray.cluster_resources()
- ray.nodes()
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19In [2]: ray.init(num_cpus=3) 2021-08-24 17:00:51,418 INFO services.py:1245 -- View the Ray dashboard at http://127.0.0.1:8266 Out[2]: {'node_ip_address': '172.20.249.211', 'raylet_ip_address': '172.20.249.211', 'redis_address': '172.20.249.211:62319', 'object_store_address': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807/sockets/raylet', 'webui_url': '127.0.0.1:8266', 'session_dir': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807', 'metrics_export_port': 55564, 'node_id': 'a045de728a3828a979f2d42620edc39d7a981854d402e6ff89ebe617'} In [3]: ray.available_resources() Out[3]: {'object_store_memory': 6036793344.0, 'memory': 12073586688.0, 'node:172.20.249.211': 1.0, 'CPU': 3.0}
调用算法
reduce 实现
使用场景
纯数值运算
- functools.reduce 性能更好
非数值运算
- agg_list 更快 o(LogN)
单步运算耗 CPU 情况 or 非数值运算数据量太大
agg_list_with_ray 更快
- 使用
ray进行 Parallel Computing
- 使用
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61from functools import reduce from typing import List, Callable, TypeVar T = TypeVar('T') import ray def agg_list(agg_fun: Callable[[T, T], T], seq: List[T]) -> T: """Concat nested list, eg: [["type1", "type2"], ["type1"], ...] """ if len(seq) == 0: raise ValueError('Need input sequence length >= 1') while len(seq) >= 2: new_list = [] for i in range(len(seq)//2): new_list.append(agg_fun(seq[i*2], seq[i*2+1])) if len(seq) % 2 != 0: new_list.append(seq[-1]) seq = new_list return seq[0] def agg_list_with_ray(agg_fun: Callable[[T, T], T], seq: List[T]) -> T: """Concat nested list, eg: [["type1", "type2"], ["type1"], ...] """ agg_list_remote = ray.remote(agg_list) worker_count = ray.available_resources().get('CPU') if len(seq) <= worker_count*10: return agg_list(agg_fun, seq) futures = [] span = min(400, int(len(seq) // worker_count)) end = int(len(seq)//span) for i in range(end): if i == end - 1: load = seq[-span:] else: load = seq[span*i: span*(i+1)] futures.append( agg_list_remote.remote(agg_fun, load) ) return agg_list(agg_fun, ray.get(futures)) assert agg_list_with_ray(lambda x,y: x+y, list(range(10000))) == reduce(lambda x,y: x+y, list(range(10000))) #----------- 性能测试 --------------------- test_list = [[1], [2]]*100000 %time _= agg_list_with_ray(lambda x, y: x+y, test_list) #CPU times: user 260 ms, sys: 199 ms, total: 459 ms #Wall time: 165 ms %time _ = agg_list(lambda x, y: x+y, test_list) #CPU times: user 219 ms, sys: 0 ns, total: 219 ms #Wall time: 213 ms %time _ = reduce(lambda x, y: x+y, test_list) # CPU times: user 40.1 s, sys: 0 ns, total: 40.1 s # Wall time: 39 s
文章作者
上次更新 2024-01-05 (5c92d1c)