教程

注意事项

  • 参考:Tips for first-time users — Ray v2.0.0.dev0
  • 条目

    1. 避免小任务使用 ray task

      • ray task 本身的消耗就要 0.5ms
      • 合理做法

        • 因此,task 要在 几毫秒以上 (>=3ms) 时使用 ray task 才合适
    2. 避免传递重复传递对象 object(特别是大对象)

      • 因为,在 ray_fun.remote(object) 调用时,自动使用 ray.put

        • 这种重复操作时间消耗巨大
      • 合理做法

        • 先用 obj_id = ray.put(ojbect), 存储 object
        • ray_fun.remote(obj_id), ray 函数调用时,直接使用 ObjectRef 对象
    3. Pipeline, 流程化任务

      • 不要等处理完成所有任务后,再做下一步处理
      • 合理做法

        • 处理完成一个,传入下一步一个
      • 例子

        • 参考:Tips for first-time users — Ray v2.0.0.dev0

           1
           2
           3
           4
           5
           6
           7
           8
           9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          
          import time
          import random
          import ray
          
          ray.init(num_cpus = 4)
          
          @ray.remote
          def do_some_work(x):
              time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
              return x
          
          def process_incremental(sum, result):
              time.sleep(1) # Replace this with some processing code.
              return sum + result
          
          start = time.time()
          result_ids = [do_some_work.remote(x) for x in range(4)]
          sum = 0
          while len(result_ids):
              done_id, result_ids = ray.wait(result_ids)
              sum = process_incremental(sum, ray.get(done_id[0]))
          print("duration =", time.time() - start, "\nresult = ", sum)

子 worker: Module Not Found 找不到模块问题

  • 解决方法

    • 在主进程中修改 PYTHONPATH 环境变量

      • 注意

        • 在 ray.init 之前
    • 例子

      1
      2
      3
      4
      5
      
      import os
      os.environ['PYTHONPATH'] = '/mnt/d/source/cropper/'
      
      import ray
      ray.init(ignore_reinit_error=True)

task 和 actor

  • @ray.remote

    • 修饰函数,函数 –> 变成 –> task
    • 修饰类, 类 –> 变成 –> actor

@ray.remote

  • 修饰以可以远程调用

修饰函数

  • 代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    import ray
    ray.init()
    
    @ray.remote
    def f(x):
        return x * x
    
    # 共享内存地址获取
    futures = [f.remote(i) for i in range(4)]
    
    # 取回结果 -> list 对象
    print(ray.get(futures)) # [0, 1, 4, 9]
  • 作用

    • 修饰编程 ray.remote_function.RemoteFunction 类对象
  • 使用: fun.remote(*args, **kwargs)

    • our_fun_name.remote(调用参数, …)
  • 特征

    • fun.remote()

      • 返回

        • 立刻返回一个 future( ObjectRef 对象), 对运行结果的引用
      • 任务

        • 返回后,在后台运行任务 task

修饰类

  • 代码

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    import ray
    ray.init() # Only call this once.
    
    @ray.remote
    class Counter(object):
        def __init__(self):
            self.n = 0
    
        def increment(self):
            self.n += 1
    
        def read(self):
            return self.n
    
    # 创建对象
    counters = [Counter.remote() for i in range(4)]
    
    # 调用对象的方法
    [c.increment.remote() for c in counters]
    
    # 获取共享内存地址
    futures = [c.read.remote() for c in counters]
    
    # ***注意: 结果在共享内存
    print(ray.get(futures)) # [1, 1, 1, 1]
  • 作用

    • 修饰类,以可以远程调用
  • 使用

    • method_name.remote(*args, **kwargs)

对象共享

底层工具

ray.put

  • 作用

    • 存储对象到共享内存 /dev/shm

      • 已经存储过的对象,不会存储再存储一次
    • 返回引用地址 ObjectRef 对象

ray.get

  • 作用

    • 获取对象,使用地址 ObjectRef
  • 参数

    • object_refs

      • 单个 ObjectRef 或 List[ObjectRef]
    • timeout

      • 超时
  • 返回值

    • 一个 Python 对象

      • 单个值
      • 或者 一个 Python 列表
  • 调用

    • fun.remote()

      • 可以直接接受 ObjectRef 对象,内部会自动转换成 Python 对象
      • 不必在 fun 内部调用 ray.get
    • 例子

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      
      import numpy as np
      
      @ray.remote
      def create_matrix(size):
          return np.random.normal(size=size)
      
      @ray.remote
      def multiply_matrices(x, y):
          return np.dot(x, y)
      
      x_id = create_matrix.remote([1000, 1000])
      y_id = create_matrix.remote([1000, 1000])
      z_id = multiply_matrices.remote(x_id, y_id)
      
      # Get the results.
      z = ray.get(z_id)

任务依赖 Task dependencies

  • 解说

    • 当 fun2.remote() 依赖于 fun1.remote() 的调用结果时,形成依赖
    • fun2() 会在 fun1() 运行完成后再运行
  • 注意

    • 传入 remote 函数(task)内部时 RefObject 自动被替换成 PyObject 原对象
    • 但是 ,在非函数传参时

      • RefObject 被使用时,不会自动被替换成 PyObject
      • 必须手动 ray.get() 转换

代码例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from typing import List, Callable, TypeVar
T = TypeVar('T')

import ray
ray.init()

# o(logN) reduce 算法:  Ray 实现
def agg_list(seq: List[T], agg_fun: Callable[[T, T], T]) -> T:
    if len(seq) == 0:
        raise ValueError('Need input sequence length >= 1')
    while len(seq) >= 2:
        new_list = []
        for i in range(len(seq)//2):
            new_list.append(agg_fun.remote(seq[i*2], seq[i*2+1]))
        new_list = ray.get(new_list)
        if len(seq) % 2 != 0:
            new_list.append(seq[-1])
        seq = new_list
    return seq[0]


# o(logN) reduce 算法: 普通Python 实现
def agg_list_no_ray(seq: List[T], agg_fun: Callable[[T, T], T]) -> T:
    if len(seq) == 0:
        raise ValueError('Need input sequence length >= 1')
    while len(seq) >= 2:
        new_list = []
        for i in range(len(seq)//2):
            new_list.append(agg_fun(seq[i*2], seq[i*2+1]))
        if len(seq) % 2 != 0:
            new_list.append(seq[-1])
        seq = new_list
    return seq[0]



# List[list] 粘贴到一起
@ray.remote
def ray_concat(x, y):
    return x + y

def concat_no_ray(x, y):
    return x + y


# 基准测试
In [1]: %time _ = agg_list([[1, 2, 3]]*int(1e4*3), ray_concat)

CPU times: user 6.23 s, sys: 1.51 s, total: 7.73 s
Wall time: 3.33 s

In [2]: %time _ = agg_list_no_ray([[1, 2, 3]]*int(1e4*3), ray_concat_no_ray)

CPU times: user 12.5 ms, sys: 0 ns, total: 12.5 ms
Wall time: 12.3 ms
  • 测试证明

    • reduce 简单操作,使用 Ray 速度更慢

      • 原因分析:Ray 需要在 不同进程间 做对象拷贝,消耗太高
  • 结论

    • 小对象,简单操作,不适合 Ray 并行运算

启动(cluster) 和 连接(client)

  • 调用工具

    • ray.init

      • 位置参数

        • address: str

          • 解说:cluster ip 地址
          • 取值

            • ip
            • 'auto'

              • 自动探测机器地址

                • 前提条件 :当前机器位于 ray cluster
            • None

              1. 自动启动一个 ray 相关程序

                • 相关程序

                  • Redis
                  • raylet
                  • plasma store
                  • plasma manager
                  • workers
              2. 脚本退出,自动关闭 ray 相关程序
  • 关键字参数

    • num_cpu: int

      • 默认值:逻辑 cpu 数量(包括物理线程数)
    • num_gpu

启动

  • python 程序启动

    • ray.init

      • 不加 address 位置参数
  • 命令行启动

    1
    
    ray start --head --port=6379

连接

  • ray.init

    • 加上 位置参数 address

单机避免重复启动

  • 步骤

    1. 命令行启动 ray start ...
    2. python 调用 ray.init('auto', ...)

资源状况获取

  • 方法

    • ray.available_resources()
    • ray.cluster_resources()
    • ray.nodes()
  • 例子

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    In [2]: ray.init(num_cpus=3)
    2021-08-24 17:00:51,418 INFO services.py:1245 -- View the Ray dashboard at http://127.0.0.1:8266
    Out[2]:
    {'node_ip_address': '172.20.249.211',
     'raylet_ip_address': '172.20.249.211',
     'redis_address': '172.20.249.211:62319',
     'object_store_address': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807/sockets/plasma_store',
     'raylet_socket_name': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807/sockets/raylet',
     'webui_url': '127.0.0.1:8266',
     'session_dir': '/tmp/ray/session_2021-08-24_17-00-50_309916_10807',
     'metrics_export_port': 55564,
     'node_id': 'a045de728a3828a979f2d42620edc39d7a981854d402e6ff89ebe617'}
    
    In [3]: ray.available_resources()
    Out[3]:
    {'object_store_memory': 6036793344.0,
     'memory': 12073586688.0,
     'node:172.20.249.211': 1.0,
     'CPU': 3.0}

调用算法

reduce 实现

  • 使用场景

    • 纯数值运算

      • functools.reduce 性能更好
    • 非数值运算

      • agg_list 更快 o(LogN)
    • 单步运算耗 CPU 情况 or 非数值运算数据量太大

      • agg_list_with_ray 更快

        • 使用 ray 进行 Parallel Computing
  • 代码实现

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    
    from functools import reduce
    from typing import List, Callable, TypeVar
    T = TypeVar('T')
    
    import ray
    
    
    def agg_list(agg_fun: Callable[[T, T], T], seq: List[T]) -> T:
        """Concat nested list, eg: [["type1", "type2"], ["type1"], ...]
        """
        if len(seq) == 0:
            raise ValueError('Need input sequence length >= 1')
        while len(seq) >= 2:
            new_list = []
            for i in range(len(seq)//2):
                new_list.append(agg_fun(seq[i*2], seq[i*2+1]))
            if len(seq) % 2 != 0:
                new_list.append(seq[-1])
            seq = new_list
        return seq[0]
    
    
    def agg_list_with_ray(agg_fun: Callable[[T, T], T], seq: List[T]) -> T:
        """Concat nested list, eg: [["type1", "type2"], ["type1"], ...]
        """
        agg_list_remote = ray.remote(agg_list)
        worker_count = ray.available_resources().get('CPU')
        if len(seq) <= worker_count*10:
            return agg_list(agg_fun, seq)
    
        futures = []
        span = min(400, int(len(seq) // worker_count))
        end = int(len(seq)//span)
        for i in range(end):
            if i == end - 1:
                load = seq[-span:]
            else:
                load = seq[span*i: span*(i+1)]
    
            futures.append(
                agg_list_remote.remote(agg_fun, load)
            )
        return agg_list(agg_fun, ray.get(futures))
    
    assert agg_list_with_ray(lambda x,y: x+y, list(range(10000))) == reduce(lambda x,y: x+y, list(range(10000)))
    
    
    #----------- 性能测试 ---------------------
    test_list = [[1], [2]]*100000
    
    %time _= agg_list_with_ray(lambda x, y: x+y, test_list)
    #CPU times: user 260 ms, sys: 199 ms, total: 459 ms
    #Wall time: 165 ms
    
    %time _ = agg_list(lambda x, y: x+y, test_list)
    #CPU times: user 219 ms, sys: 0 ns, total: 219 ms
    #Wall time: 213 ms
    
    %time _ = reduce(lambda x, y: x+y, test_list)
    # CPU times: user 40.1 s, sys: 0 ns, total: 40.1 s
    # Wall time: 39 s