工具收集

参考:

消息循环工具

  1. libuv + uvloop

asyncio 的类型替换框架

  1. trio
  2. curio
  3. anyio

    • 支持方便切换使用的消息循环实现(uvloop)
    • 可以在 asyncio 和 trio 之间任意切换

工具 library

  1. 文件读写

    • aiofiles
  2. http 请求

    • aiohttp

      • 专注 asyncio 下的 http 处理
      • 性能更好,相比于 httpx
    • httpx

      • asycio + sync
      • 同步编程 + 异步编程都支持
      • 支持 httpv2

数据库 + asyncio

  1. sqlalchemy + asyncio

  2. postgresql + asyncpg
  3. sqlite + aiosqlite

英语名词

Wordsexplanation
primeVt. 使……做好准备
coroutines must be primed.

generator

  • 是一个函数
  • 但是调用后返回的不是结果
  • 而是一个 iterator
1
2
3
4
5
def producer(n):
    i = 0
    while i < n:
        yield i
        i -= 1

generator 用途

pipeline 组件

可以把一系列 generator, 组合完成逐个步骤

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time


def follow(thefile):
    thefile.seek(0, 2)  # Go to the end of the file
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)  # Sleep briefly
            continue
        yield line


        def grep(pattern, lines):
            for line in lines:
                if pattern in line:
                    yield line

yield

作用

抛出值

像上面 generator 中一样

获取外界传入的值

1
2
3
4
5
6
def grep(pattern):
    print "Looking for %s" % pattern
    while True:
        line = (yield)
        if pattern in line:
            print line,
  • 这里 yield, 可以把从外界接收的值传给 line

    • 方法:

      1
      2
      3
      4
      
      gen = grep('One')
      gen.next()  # or gen.send(None)
      gen.send('One line')   # --> print out
      gen.send('Second line')  # --> print nothing
用在 coroutine 中,启动方法(priming)
  1. gen.send(None)
  2. next(gen) 或 gen.next()

Coroutine

Generator 实现

启动

  • generator 必须被预处理准备一下(be primed)

    • 启动方法:gen.send(None) 或 next(gen) [in python3]
    • 作用:让代码运行到第一次出现 yield 的地方
  • 这样才能接受外面传入的数据

关闭

  • 关闭方法:gen.close()
  • 原因:避免 generator 无限运行下去
  • 注意:python 垃圾回收器 Garbage collection 也能关闭,通过 close()
close()可以被 generator 捕捉到
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@coroutine
def grep(pattern):
    print "Looking for %s" % pattern
    try:
        while True:
            line = (yield)
            if pattern in line:
                print line,
    except GeneratorExit:
        print "Going away. Goodbye"

抛出异常

  • 方法:gen.throw(RuntimeError, 'It is over.')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> g = grep("python")
>>> g.next() # Prime it
Looking for python
>>> g.send("python generators rock!")
python generators rock!
>>> g.throw(RuntimeError,"You're hosed")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in grep
RuntimeError: You're hosed
>>>

辨析

  • Coroutine 不是 generator

    • 我们注意到,generator 只产生值
    • Coroutine, 同时产生和接收值
    • Coroutine, generator 要进行启动
  • 角色不同

    • Coroutine 是消费者
    • Generator 是生产者
  • 是否迭代

    • Generator 是连续的迭代
    • Coroutine 不是迭代

      • 它是接收数据,再处理

使用场景

Pipelines
Filters

协程本质

实现异步的本质

  • 通过 gen.send() 来实现
  • 把协程分成 n 次 send() 调用

    • 这样,可以随时通过调用不同协程的 send()
    • 来实现运行的协程之间的切换

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      
      gen1 = Gen1()
      gen2 = Gen2()
      
      gen1.send(None)
      gen2.send(None)
      
      # * 异步切换
      gen1.send('x')
      gen2.send('y')
      
      gen1.send('x1')
      gen2.send('y1')
      
      # * 同步顺序执行
      gen1.send('x')
      gen1.send('x1')
      
      gen2.send('y')
      gen2.send('y1')

协程的工具模块

https://blog.csdn.net/weixin_41599977/article/details/93656042

    • IO 耗时操作

      • 网络请求
      • 文件读写
      • asyncio.sleep 模拟

asyncio

python 标准库

greenlet

手动切换协程工具

  • 使用 switch() 函数切换
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from greenlet import greenlet
import time

def task_1():
    while True:
        print("--This is task 1!--")
        g2.switch()  # 切换到g2中运行
        time.sleep(0.5)

def task_2():
    while True:
        print("--This is task 2!--")
        g1.switch()  # 切换到g1中运行
        time.sleep(0.5)

if __name__ == "__main__":
    g1 = greenlet(task_1)  # 定义greenlet对象
    g2 = greenlet(task_2)

    g1.switch()  # 切换到g1中运行

gevent

自动完成对 IO 操作的自动切换

  • 使用 join()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import gevent

def task_1(num):
    for i in range(num):
        print(gevent.getcurrent(), i)
        gevent.sleep(1)  # 模拟一个耗时操作,注意不能使用time模块的sleep

if __name__ == "__main__":
    g1 = gevent.spawn(task_1, 5)  # 创建协程
    g2 = gevent.spawn(task_1, 5)
    g3 = gevent.spawn(task_1, 5)

    g1.join()  # 等待协程运行完毕
    g2.join()
    g3.join()

asyncio 模块

  • 以下假设 hello() 会生成一个 coroutine 对象

单个 coroutine

asyncio.run

  • 直接运行单个 coroutine
  • 支持版本:python3.7
1
asyncio.run(hello())

loop

消息循环

run_until_complete()

接受类型
  • 单个 coroutine

    1
    2
    
    loop = aysncio.get_event_loop()
    loop.run_until_complete(hello())
  • 把 coroutine 包装成 Task 对象

    • asyncio.Future 的子类,能保存 coroutine 的运行状态,和返回值

      1
      2
      3
      
      # task = loop.create_task(hello())
      task = loop.ensure_future(hello())
      loop.run_until_complete(task)
  • awaitable

    1
    2
    3
    
    # hello() 会生成一个 coroutine 对象
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))

task

创建 task

https://stackoverflow.com/a/36415477

loop.create_task()
  • 不同类型的 loop, 实现方式不同,类似被重载了
  • 自行实现时使用
  • 来源:子类 Task
  • 版本:>= python3.7
  • 接受参数类型

    • coroutine –> Task
loop.ensure_future()
  • 行为,由输入参数来决定
  • 来源:基类 Future
  • 推荐使用
  • 版本:支持老版本
  • 接受参数类型

    • coroutine —> Task
    • awaitable —> Task
    • Future object —> Future
    • 可以看出

      • ensure_future 接受类型更广泛

添加 call_back 回调函数

  • task.add_done_callback

    • 接口

      • call_back(future)

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        
        import asyncio
        
        async def work(x):
        for _ in range(3):
            print('Work {} is running ..'.format(x))
        return "Work {} is finished".format(x)
        
        def call_back(future):
        print("Callback: {}".format(future.result()))
        
        coroutine = work(1)
        loop = asyncio.get_event_loop()
        task = asyncio.ensure_future(coroutine)
        task.add_done_callback(call_back)
        loop.run_until_complete(task)

概念辨析

coroutine

  • coroutine function

    • 用于创建 coroutine
  • coroutine object

awaitable

  • 包括类型

    • coroutine object
    • Task
    • Future

Task

  • 被安排(scheduled)即将执行的 coroutine

Future

  • low-level awaitable object

定时任务

类型

  • 延后触发
  • 固定时间触发

实现方法

注意

  • 延迟失效问题

    • 当消息循环中,没有别的可执行协程 coro 任务
  • 即:待执行协程 coro 是唯一的任务时

    • 这时,延迟功能失效,给定协程会被立即执行

标准库实现 asyncio

  • loop.create_task(coroutine)

    • 需要 coroutine 自己管理延迟
  • loop.ensure_future(coro)

    • 类似 create_task
  • loop.call_soon(callback, args, …)

    • 下一次 loop 循环时立刻执行
    • 注意:
  • callback 是回调函数,不是 “协程”
  • loop.call_later(delay_time, call_back)

    • 在给定的延迟时间后,执行

手工实现

协程内实现

  1. 先休眠一段实现

    1
    
      await loop.sleep(delay)
  2. 执行要运行的任务

    1
    
      await your_target(*args)
  3. 安排结构—-放入消息循环 loop

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    # * 延迟执行协程
    async def sysexit(delay):
        import sys
        print('exit {} seconds later.'.format(delay))
        # 1) 休眠
        await asyncio.sleep(delay)
    
        print('exit now.')
        # 2) 执行任务
        sys.exit(0)
    
    
    # * 安排执行
    async def test_sysexit_from_child_thread():
        # import threading
        loop = asyncio.get_running_loop()
    
        print(loop.time())
    
        # 3) 安排放入消息循环
        loop.create_task(sysexit(5))
        # await sysexit(5)
        # loop.call_later(1*1000, sysexit())
        # loop.call_at(loop.time() + 1, sysexit())
        print(loop.time())
        # while True:
        print('sleep 0.1 second.')
        await asyncio.sleep(0.1)

异步运行 函数(function, 非协程)

  • 参考

  • 以 call 开头的方法

    • 快速调用

      • loop.call_soon
      • loop.call_soon_thread_safe
    • 延迟(编排)调用

      • loop.call_later
      • loop.call_at
    • executor 调用

      • loop.run_in_executor
  • asyncio 提供

    asyncio.to_thread
    在一个单独的线程中运行函数

异步运行 协程

  • loop 对象方法

    • loop.create_future
    • loop.create_task
  • 高级方法(asyncio 中提供)

    • 单个协程

      • asyncio.create_task
      • asyncio.create_task
      • await your_corotine
      • asyncio.run_coroutine_threadsafe(coro, loop)
    • 多个协程

      • asyncio.gather
      • asyncio.as_completed :: 返回完成的 coroutine iterator

包裹 实现功能

  • 设置 timeout

    • asyncio.wait_for
  • 禁用 Cancel

    • asyncio.shield

命名 习惯

  • 以 _for 结尾的方法,都是必须要有 timeout 参数的

    • eg:

      asyncio.wait
      可以不设置 timeout
      asyncio.wait_for
      必须设置 timeout

Loop 探测工具

  • asyncio.current_task
  • asyncio.all_tasks

时间

  • loop.time
  • asyncio.sleep
  • loop.call_later :: 延迟时间
  • loop.call_at :: 与 loop.time 相关的时间

普通函数 转换成 awaitable

  • 参考

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    import functools
    
    
    def awaitify(fun):
        """Convert general function to awaitable  function(async await)
        """
        @functools.wraps(fun)
        async def wrapper(*args, **kwargs):
            return fun(*args, **kwargs)
        return wrapper