Python 异步(协程) 与 并发

First Post:

Last Update:

并发

概念

程序提速方法:

  • 单线程串行
  • 多线程并发 threading 模块
  • 多 CPU 并行 multiprocessing 模块
  • 多机器并行 hadoop/hive/spark 模块

异步

python 协程概念

单线程智能的去切换执行程序

实现协程的方法:

  • 通过第三方模块
  • yield 关键字
  • asyncio 模块1 (python >= 3.4)
  • async/await 关键字 (python >= 3.5) 推荐
事件循环

在事件循环中, 会去检测并执行事件循环内的代码(事件)

每个事件拥有两个状态:

  • 可执行
  • 已完成

事件循环会对其事件的状态进行处理, 当处理完所有事件后, 事件循环将会停止

1
2
# 任务列表
tasks = [任务1, 任务2, 任务3]
1
2
3
4
5
# 生成一个事件循环
loop = asyncio.get_event_loop()

# 将任务任务列表放入事件循环
loop.run_until_complete(tasks)
python 异步编程
async 关键字
  • 协程函数 async def func():
  • 协程对象 func()

当执行协程函数时, 会返回一个协程对象
协程函数内的代码不会被执行

要想运行协程函数的内部代码, 还需要使用事件循环 和 任务列表2

1
2
3
4
5
6
7
# 定义协程函数
async def func():
pass

# 执行协程函数得到 协程对象
# 协程对象 result
result = func()

事件循环的老写法:

1
2
3
4
5
6
7
8
async def func():
print("这是协程函数内的代码")

# python <= 3.7 的写法
# 执行事件循环中的事件
result = func()
loop = asyncio.get_event_loop()
loop.run_until_complete(result)

新写法:

1
2
3
4
5
6
7
8
9
10
async def func():
print("这是协程函数内的代码")

# 现代写法
result = func()
asyncio.run(result)

# asyncio.run(result) 等价于:
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)

await 关键字

await: 异步等待 (asynchronous wait)

在 python 中, 有 3 种可等待对象:

  • 协程对象 Coroutine
  • Future
  • Task 对象
1
2
# 使用方法
await 可等待对象

代码实例:

await 可以执行 协程对象

先执行所有代码, await 可等待对象 会执行操作并挂起
事件循环会一直等待
当可等待对象完成操作后, 事件循环将切换并执行下一个代码(可以简单认为 await 相当于回调)

以下代码均为同步代码 ↓

1
2
3
4
5
6
7
8
import asyncio

async def asy_test():
print("测试测试")
await asyncio.sleep(4)
print("结束的阻塞代码")

asyncio.run(asy_test())

Task 对象

用于在事件循环中添加多个任务

通过 asyncio.create_task(协程对象) 创建 Task 对象3
然后协程对象会加入事件列表中等待被事件循环调度执行

与 协程对象 不同的是, 事件循环会切换执行 Task 对象

不常用的实例:

异步代码 ↓

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def func():
print("开头")
await asyncio.sleep(3)
print("结尾")


# 最终程序耗时3秒
async def main():
print("主程序开始...")
# 创建Task对象
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())

print("Task对象创建结束")

ret1 = await task1
ret2 = await task2

print("最终值: ", ret1, ret2)

asyncio.run(main())

常用的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 改造上面的代码 ↑
# asyncio.create_task(协程对象, name="任务名称")
task_li = [
asyncio.create_task(func(), name="任务1"),
asyncio.create_task(func(), name="任务2")
]

# asyncio.wait(任务列表, timeout)
# timeout 设置时间限制 (默认time=None, 等待时间无限制)
# 将列表中的所有任务进行 await
# 返回元组 (done, pending)
# done: 返回集合
# pending: 未完成的任务
await asyncio.wait(task_li)

Future 对象 (asyncio)

不常用的底层对象 Task对象 的基类
Task 继承 Future, Task 内部结果的处理基于 Future

1
2
3
4
5
6
7
8
9
10
11
12
async def main():
# 获取事件循环
loop = asyncio.get_event_loop()

# 创建Future对象, 不做任何事
future = loop.create_future()

# 等待任务结果, 但由于没有做如何事
# 该代码会一直等下去
await future

asyncio.run(main())

另一个实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def set_after(future):
await asyncio.sleep(2)

# 手动设置 Future 任务的最终结果
future.set_result()

async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()

await loop.create_task(set_after(future))
data = await future
print(data)

asyncio.run(main())

asyncio 异步迭代器

迭代器: 实现 __iter__()__next__ 方法的对象

异步迭代器: __anext__ 方法必须返回一个可等待对象.
async for 会处理异步迭代器的 __anext__() 方法所返回的可等待对象,
直到其引发一个 StopAsyncIteration 异常

1
....

异步上下文管理器

此种对象通过定义 __aenter__()__aexit__() 方法
async with 语句的环境进行控制

上下文管理器: with

1
...

uvloop

简单理解为是异步事件循环的替代方案
通过第三方实现, uvloop 事件循环要优于默认 asyncio 的事件循环

uvloop 性能更好

1
pip install uvloop

代码示例

asyncio 模块异步编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

# 协程函数(老写法, python>=3.8已弃用)
@asyncio.coroutine
def func1():
print(1)
# 模拟IO操作
# 当遇到消耗大量时间的IO操作时
# 线程会切换执行其他任务
yield from asyncio.sleep(2)
print(2)

# 现代写法
async def func2():
print(3)
await asyncio.sleep(2)
print(4)

# 事件列表
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

# 遇到IO自动切换
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

最终输出结果:

1
2
3
4
>>>1
>>>3
>>>2
>>>4

async/await 协程

asyncawait 原理与 asyncio 模块的实现方法一致
区别仅在于代码的简洁性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

async def func1():
print(1)
# 模拟IO操作
await asyncio.sleep(2)
print(2)

async def func2():
print(3)
await asyncio.sleep(2)
print(4)

# 事件列表
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

最终输出结果:

1
2
3
4
>>>1
>>>3
>>>2
>>>4

yiled 关键字

了解即可

yiled 返回数据

1
2
3
4
5
6
7
8
9
10
11
12
13
def func1():
yield 1
yield from func2()
yield 2

def func2():
yield 3
yield 4

f1 = func1()

for item in f1:
print(item)

最终输出结果:

1
2
3
4
>>>1
>>>3
>>>4
>>>2

greenlet 协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
form greenlet import greenlet

def func1():
print("1")

# 切换执行函数2
gr2.switch()

print("2")
gr2.switch()

def func2():
print("3")
gr1.switch()
print("4")

# 生成?
gr1 = greenlet(func1())
gr2 = greenlet(func2())

# 执行函数1
gr1.switch()

最终输出结果:

1
2
3
4
>>>1
>>>3
>>>2
>>>4

异步与阻塞混用

案例: asyncio + 不支持异步的模块
但需要多线程的编程知识, 暂时不管

简单来讲, 如果是单线程, asyncio + 不支持异步的模块
是难以做到异步的

1
...


  1. 1.python 标准库
  2. 2.任务列表, 实现异步的关键
  3. 3.还可以通过 asyncio.ensure_future()创建 Task (python <= 3.7)