前言
说实话,我python用了应该也有四五年了,但是从来没用到过**asyncio**
这玩意…所以今天大概是有点心血来潮来稍微学习一下这个冷门库。一般来说不是生产环境,而是自己整点小需求搞定的话,估计是用不到这个库的。
所以这篇文章主要就是入门一下,没有过深的讲解。
什么是异步?
比如用同步io发网络请求,首先cpu通知网卡发送和接收数据包,然后为了在完成后及时进行后续处理,cpu要不断的询问网卡,完成了吗完成了吗完成了吗。这样写代码是最直观的,但很多时间就在这个询问的过程中白白消耗掉了。
但实际上,cpu不一定要不断询问网卡,网卡可以在完成后主动通知cpu,进行后续操作。这样一来,没有了不断询问的这个过程,这段时间cpu就可以干别的。
作者:Meltsun墨小残
链接:https://www.zhihu.com/question/451397804/answer/3118860310
大概这么理解就好了。
入门案例
首先一点要明确,如果要使用异步io,则必须要使用异步的定制函数,否则会报错。
比如实现这样的函数
def main():
time.sleep(1)
print("done")
if __name__ == '__main__':
main()
异步的情况是
async def main():
# asyncio.sleep(1) 这样写将出现 RuntimeWarning 然后这行语句会被直接跳过执行 print("done")
# await time.sleep(1) 这样写会报错 TypeError 因为非异步库并不返回Coroutine对象
await asyncio.sleep(1)
print("done")
if __name__ == '__main__':
asyncio.run(main())
上述代码中的await删除后,直接使用asyncio.sleep(1)
,就会发现程序不会进行任何等待,只会产生一个RuntimeWarning
。也就是说,该行代码直接被跳过了。
这是由于简单调用一个异步函数并不能执行函数,而是会把函数包装成一个Coroutine
,然后返回这个Coroutine
。
a = asyncio.sleep(1)
print(a)
# 输出 <coroutine object sleep at 0x000002ADFC227DC0>
想执行异步函数,我们需要把这个Coroutine
提交给asyncio
底层,让它帮我们执行。使用await就是实现这一点最常见的方式。这就是为什么python的异步需要用到asyncio.run
,async
,await
等。
await一个异步函数返回的Coroutine
,就像调用一个普通的同步函数一样,跳转过去,执行,然后返回。因此,我们说**Coroutine**
是同步的。
如何实现并发?
用Task(本质上是Future,但我们目前只需要用到task)。
用asyncio.create_task
就可以把Coroutine
打包成Task。它们的不同点在于,Coroutine
是同步的,而Task是异步的。
之前我们提到过,想执行异步函数,我们要把它提交给asyncio
,让它帮我们执行。如果我们有一个Coroutine
,我们必须await它,才能把相应的异步函数提交给asyncio
。
而Task则不然,它在创建的时候就已经把相应的异步函数提交给了asyncio
,因此,之后我们await任何东西,**asyncio**
都会尝试执行Task里边的异步函数。
更进一步,想象如果我们创建了很多Task会发生什么:
-
如果其中一个出现异步阻塞(await一个不能马上完成的东西)
-
asyncio
会尝试推进其他Task(await任何东西,asyncio
都会尝试执行Task)
这种异步能带来效率上的提升。这其实就是asyncio
这套逻辑的核心思想。在大多数时候我们需要做的事情就是把需要并发的代码封装成Task,然后await它们。
import asyncio
async def my_coroutine(t):
print(f"Coroutine {t} is executing...")
await asyncio.sleep(t)
print(f"Coroutine {t} is done.")
myID = '114514_' + str(t)
return myID
async def main():
print("直接await协程对象:")
id1 = await my_coroutine(1)
id2 = await my_coroutine(2)
print(f'{id1=}, {id2=}')
print('----------------------------------------------------------------')
print("包装成任务,然后await:")
task1 = asyncio.create_task(my_coroutine(1))
task2 = asyncio.create_task(my_coroutine(2))
id1 = await task1
print("在这行之前,task2就已经开始执行了:")
id2 = await task2
print(f'{id1=}, {id2=}')
print('----------------------------------------------------------------')
print("随便await什么,任务其实都能开始执行:")
task1 = asyncio.create_task(my_coroutine(1))
task2 = asyncio.create_task(my_coroutine(2))
await asyncio.sleep(0.1)
print('----------------------------------------------------------------')
asyncio.run(main())
上面的代码输出为
直接await协程对象:
Coroutine 1 is executing...
Coroutine 1 is done.
Coroutine 2 is executing...
Coroutine 2 is done.
id1='114514_1', id2='114514_2'
----------------------------------------------------------------
包装成任务,然后await:
Coroutine 1 is executing...
Coroutine 2 is executing...
Coroutine 1 is done.
在这行之前,task2就已经开始执行了:
Coroutine 2 is done.
id1='114514_1', id2='114514_2'
----------------------------------------------------------------
随便await什么,任务其实都能开始执行:
Coroutine 1 is executing...
Coroutine 2 is executing...
----------------------------------------------------------------
注意,从第二个分割线开始,异步io的核心特质就出现了。
一旦我们开始执行了asyncio.create_task
,可以发现在函数my_coroutine
中await语句之前的print已经被执行了。但是只要我们没有调用比如 id1 = await task1
,函数就会阻塞在await语句这里,不会尝试去运行下面的语句以获得函数的结果。
await一个Coroutine
的含义是 开始执行、等待执行完成、获取结果。
而await一个Task,由于Task在之前就已经开始了,所以这里的含义是等待执行完成、获取结果。
下面代码展示了不await Task也能运行Task,个人认为很有助于理解Task的使用。
import asyncio
async def my_coroutine(id: int, results: list):
print(f"Coroutine {id} is executing...")
for i in range(10): # 从这里可以看出执行完循环至少需要1秒
await asyncio.sleep(0.1)
print(i, end='') # 不要换行
print(f"Coroutine {id} is done.")
results.append(f"result of Coroutine {id}")
async def main():
results = []
asyncio.create_task(my_coroutine(1, results))
asyncio.create_task(my_coroutine(2, results))
print(results)
await asyncio.sleep(0.5) # 我们尝试一下不给task充足的执行时间
print(results)
asyncio.run(main())
打印结果为
[]
Coroutine 1 is executing...
Coroutine 2 is executing...
00112233[]
也就是说,对于一个task来说,但凡在后面使用了await语句,所有之前被创造出来的task都会尝试去执行被await阻塞的语句。如果时间不够,则不执行完成,也不会有返回结果。我们试试给予充足的时间看看,修改*await *``asyncio.sleep(3)
,打印结果就会如下:
[]
Coroutine 1 is executing...
Coroutine 2 is executing...
0011223344556677889Coroutine 1 is done.
9Coroutine 2 is done.
['result of Coroutine 1', 'result of Coroutine 2']
此时results就会有结果,函数也会运行完毕。
一个实战案例:
import asyncio
import time
async def my_coroutine_1():
print('该函数用以准备数据')
data = []
for i in range(20):
# time.sleep(0.1)
await asyncio.sleep(0.1)
data.append(i)
print(f'数据装填中 {data}')
print('数据装填完毕')
return data
async def my_coroutine_2():
print('该函数用以准备另一份数据数据')
data = []
for i in range(10):
# time.sleep(0.2)
await asyncio.sleep(0.1)
data.append(i * 0.01)
print(f'数据装填中 {data}')
print('数据装填完毕')
return data
async def main():
start = time.time()
task1 = asyncio.create_task(my_coroutine_1())
task2 = asyncio.create_task(my_coroutine_2())
print('主程序要进行加载前准备')
for i in range(10):
# time.sleep(0.15)
await asyncio.sleep(0.05)
print(f'主程序准备完成 {i * 10}%')
print('主程序加载完毕,准备执行相加')
data1 = await task1
data2 = await task2
print(data1 + data2)
print(f'用时{time.time() - start} s')
asyncio.run(main())
最终执行结果是需要2s左右,比串行的效率是高不少。
从打印结果上来看,一开始是并行的,后面主程序先打印出加载完毕,然后是data2准备完毕,最后是data2完成,这些全部完成之后,代码会执行相加,输出最终完整结果。
如果我注释掉这几行
# data1 = await task1
# data2 = await task2
# print(data1 + data2)
则最后的输出会成为
plain text
...
数据装填中 [0, 1, 2, 3, 4]
数据装填中 [0.0, 0.01, 0.02, 0.03, 0.04]
主程序准备完成 90%
主程序加载完毕,准备执行相加
用时0.549534797668457 s
上述代码实现的要求就是中间必须有异步io的实现方法,把代码中所有的await asyncio.sleep()
改为time.sleep()
来运行的话,上述脚本会完全变成一个串行函数。
参考
python asyncio的设计晦涩难懂,一点也不python,是做毁了吗? - 知乎
主要参考@Meltsun墨小残的回答。
[文章导入自 http://qzq-go.notion.site/12649a7b4e7580888a00c47d0e788df9 访问原文获取高清图片]