coroutines + asyncio ---- python coroutine 协程笔记
文章目录
工具收集
参考:
消息循环工具
- libuv +
uvloop
asyncio 的类型替换框架
- trio
- curio
anyio
- 支持方便切换使用的消息循环实现(uvloop)
- 可以在 asyncio 和 trio 之间任意切换
工具 library
文件读写
- aiofiles
http 请求
aiohttp
- 专注 asyncio 下的 http 处理
- 性能更好,相比于 httpx
httpx
- asycio + sync
- 同步编程 + 异步编程都支持
- 支持 httpv2
数据库 + asyncio
sqlalchemy + asyncio
- Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation
asyncio + sqlalchemy + postgresql + asyncpg
- Connect to PostgreSQL with SQLAlchemy and asyncio — Makimo – Consultancy & So…
- database_url 格式:
postgresql+asyncpg://<db_username>:<db_secret>@<db_host>:<db_port>/<db_name>
- postgresql + asyncpg
- sqlite + aiosqlite
英语名词
| Words | explanation |
|---|---|
| prime | Vt. 使……做好准备 |
| coroutines must be primed. |
Tutorials
generator
- 是一个函数
- 但是调用后返回的不是结果
- 而是一个 iterator
| |
generator 用途
pipeline 组件
可以把一系列 generator, 组合完成逐个步骤
| |
yield
作用
抛出值
像上面 generator 中一样
获取外界传入的值
| |
这里 yield, 可以把从外界接收的值传给 line
方法:
1 2 3 4gen = grep('One') gen.next() # or gen.send(None) gen.send('One line') # --> print out gen.send('Second line') # --> print nothing
用在 coroutine 中,启动方法(priming)
- gen.send(None)
- next(gen) 或 gen.next()
Coroutine
Generator 实现
启动
generator 必须被预处理准备一下(be primed)
- 启动方法:gen.send(None) 或 next(gen) [in python3]
- 作用:让代码运行到第一次出现 yield 的地方
- 这样才能接受外面传入的数据
关闭
- 关闭方法:gen.close()
- 原因:避免 generator 无限运行下去
- 注意:python 垃圾回收器 Garbage collection 也能关闭,通过 close()
close()可以被 generator 捕捉到
| |
抛出异常
- 方法:gen.throw(RuntimeError, 'It is over.')
| |
辨析
Coroutine 不是 generator
- 我们注意到,generator 只产生值
- Coroutine, 同时产生和接收值
- Coroutine, generator 要进行启动
角色不同
- Coroutine 是消费者
- Generator 是生产者
是否迭代
- Generator 是连续的迭代
Coroutine 不是迭代
- 它是接收数据,再处理
使用场景
Pipelines
Filters
协程本质
实现异步的本质
- 通过 gen.send() 来实现
把协程分成 n 次 send() 调用
- 这样,可以随时通过调用不同协程的 send()
来实现运行的协程之间的切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19gen1 = Gen1() gen2 = Gen2() gen1.send(None) gen2.send(None) # * 异步切换 gen1.send('x') gen2.send('y') gen1.send('x1') gen2.send('y1') # * 同步顺序执行 gen1.send('x') gen1.send('x1') gen2.send('y') gen2.send('y1')
协程的工具模块
https://blog.csdn.net/weixin_41599977/article/details/93656042
注
IO 耗时操作
- 网络请求
- 文件读写
- asyncio.sleep 模拟
asyncio
python 标准库
greenlet
手动切换协程工具
- 使用 switch() 函数切换
| |
gevent
自动完成对 IO 操作的自动切换
- 使用 join()
| |
asyncio 模块
- 以下假设 hello() 会生成一个 coroutine 对象
单个 coroutine
asyncio.run
- 直接运行单个 coroutine
- 支持版本:python3.7
| |
loop
消息循环
run_until_complete()
接受类型
单个 coroutine
1 2loop = aysncio.get_event_loop() loop.run_until_complete(hello())把 coroutine 包装成 Task 对象
asyncio.Future 的子类,能保存 coroutine 的运行状态,和返回值
1 2 3# task = loop.create_task(hello()) task = loop.ensure_future(hello()) loop.run_until_complete(task)
awaitable
1 2 3# hello() 会生成一个 coroutine 对象 tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks))
task
创建 task
https://stackoverflow.com/a/36415477
loop.create_task()
- 不同类型的 loop, 实现方式不同,类似被重载了
- 自行实现时使用
- 来源:子类 Task
- 版本:>= python3.7
接受参数类型
- coroutine –> Task
loop.ensure_future()
- 行为,由输入参数来决定
- 来源:基类 Future
- 推荐使用
- 版本:支持老版本
接受参数类型
- coroutine —> Task
- awaitable —> Task
- Future object —> Future
可以看出
- ensure_future 接受类型更广泛
添加 call_back 回调函数
task.add_done_callback
接口
call_back(future)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15import asyncio async def work(x): for _ in range(3): print('Work {} is running ..'.format(x)) return "Work {} is finished".format(x) def call_back(future): print("Callback: {}".format(future.result())) coroutine = work(1) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) loop.run_until_complete(task)
概念辨析
coroutine
coroutine function
- 用于创建 coroutine
- coroutine object
awaitable
包括类型
- coroutine object
- Task
- Future
Task
- 被安排(scheduled)即将执行的 coroutine
Future
- low-level awaitable object
定时任务
类型
- 延后触发
- 固定时间触发
实现方法
注意
延迟失效问题
- 当消息循环中,没有别的可执行协程 coro 任务
即:待执行协程 coro 是唯一的任务时
- 这时,延迟功能失效,给定协程会被立即执行
标准库实现 asyncio
loop.create_task(coroutine)
- 需要 coroutine 自己管理延迟
loop.ensure_future(coro)
- 类似 create_task
loop.call_soon(callback, args, …)
- 下一次 loop 循环时立刻执行
- 注意:
- callback 是回调函数,不是 “协程”
loop.call_later(delay_time, call_back)
- 在给定的延迟时间后,执行
手工实现
协程内实现
先休眠一段实现
1await loop.sleep(delay)执行要运行的任务
1await your_target(*args)安排结构—-放入消息循环 loop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28# * 延迟执行协程 async def sysexit(delay): import sys print('exit {} seconds later.'.format(delay)) # 1) 休眠 await asyncio.sleep(delay) print('exit now.') # 2) 执行任务 sys.exit(0) # * 安排执行 async def test_sysexit_from_child_thread(): # import threading loop = asyncio.get_running_loop() print(loop.time()) # 3) 安排放入消息循环 loop.create_task(sysexit(5)) # await sysexit(5) # loop.call_later(1*1000, sysexit()) # loop.call_at(loop.time() + 1, sysexit()) print(loop.time()) # while True: print('sleep 0.1 second.') await asyncio.sleep(0.1)
异步运行 函数(function, 非协程)
参考
以 call 开头的方法
快速调用
- loop.call_soon
- loop.call_soon_thread_safe
延迟(编排)调用
- loop.call_later
- loop.call_at
executor 调用
- loop.run_in_executor
asyncio 提供
- asyncio.to_thread
- 在一个单独的线程中运行函数
异步运行 协程
包裹 实现功能
设置 timeout
- asyncio.wait_for
禁用 Cancel
- asyncio.shield
命名 习惯
以 _for 结尾的方法,都是必须要有 timeout 参数的
eg:
- asyncio.wait
- 可以不设置 timeout
- asyncio.wait_for
- 必须设置 timeout
Loop 探测工具
- asyncio.current_task
- asyncio.all_tasks
时间
- loop.time
- asyncio.sleep
- loop.call_later :: 延迟时间
- loop.call_at :: 与 loop.time 相关的时间
普通函数 转换成 awaitable
1 2 3 4 5 6 7 8 9 10import functools def awaitify(fun): """Convert general function to awaitable function(async await) """ @functools.wraps(fun) async def wrapper(*args, **kwargs): return fun(*args, **kwargs) return wrapper
sqlalchemy + asyncio 教程
文章作者
上次更新 2025-02-24 (77fafc9)