为什么需要异步
# 同步:逐个等待
import time
def download(url):
time.sleep(1) # 模拟 IO 等待
return f"{url} done"
start = time.time()
for url in ["a", "b", "c"]:
download(url)
print(f"{time.time() - start:.1f}s") # 3.0s
# 异步:并发等待
import asyncio
async def download(url):
await asyncio.sleep(1) # 不阻塞事件循环
return f"{url} done"
async def main():
tasks = [download(url) for url in ["a", "b", "c"]]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
# 耗时 ~1.0s,三个任务并发执行
async/await 语法
# async def 定义协程函数 — 调用它不执行,返回协程对象
async def foo():
return 42
coro = foo() # 不执行,只是协程对象
result = await coro # 真正执行
# await 只能出现在 async 函数内部
# await 后面必须是 Awaitable(协程、Task、Future)
# 常见的可 await 对象:
await asyncio.sleep(1) # 等待
await some_coroutine() # 协程
await asyncio.gather(...) # 并发等
await task # Task 对象
事件循环
# 获取当前事件循环
loop = asyncio.get_event_loop()
# 主入口(Python 3.7+)
asyncio.run(main())
# 等价于:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
Task 并发
# create_task:立即调度,不阻塞
async def main():
task1 = asyncio.create_task(download("a"))
task2 = asyncio.create_task(download("b"))
# 两个任务已经在后台跑了
result1 = await task1 # 等 task1 完成
result2 = await task2 # 等 task2 完成
# gather:批量等待
results = await asyncio.gather(
download("a"),
download("b"),
download("c"),
return_exceptions=True # 单个失败不影响其他
)
# as_completed:哪个先完成先处理
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
| 方法 | 返回时机 | 适用场景 |
|---|
gather | 全部完成 | 批量请求 |
as_completed | 逐个完成 | 哪个快处理哪个 |
wait | 可配置 | FIRST_COMPLETED / ALL_COMPLETED |
实战:aiohttp 网络请求
import aiohttp
import asyncio
async def fetch(url, session):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(url, session) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Got {len(results)} responses")
asyncio.run(main()) # ~1s, not 3s
常见陷阱
陷阱 1:async 函数里调同步阻塞代码
async def bad():
time.sleep(2) # ❌ 阻塞整个事件循环
result = requests.get(url) # ❌ requests 是同步库
async def good():
await asyncio.sleep(2) # ✅
# 用 aiohttp 替代 requests
陷阱 2:忘记 await
asyncio.create_task(download("a")) # ⚠️ 没有 await,可能还没跑完 main 就退出了
# 解决:存引用,最后 await
task = asyncio.create_task(download("a"))
await task
陷阱 3:协程对象重复 await
coro = download("a")
await coro # ✅ 第一次
await coro # ❌ RuntimeError: cannot reuse already awaited coroutine
# 正确:想多次执行就每次重新调用
异步 vs 多线程
| asyncio | threading |
|---|
| 适合 | IO 密集型 | IO / CPU 密集型 |
| 切换成本 | 极低(协程切换) | 较高(线程切换) |
| 并发数 | 数千个 | 几十个 |
| GIL | 不受影响 | 受 GIL 限制 |
| 学习曲线 | 较高 | 中等 |
| 调试 | 较难 | 相对容易 |