Python并发编程深度指南
Python并发是很多开发者的痛点——GIL为什么这么设计?asyncio到底是怎么工作的?多进程、多线程、协程如何选择?本文一次性说清楚。
一、GIL:Python的设计折衷
GIL(Global Interpreter Lock)是CPython的一个全局互斥锁,同一时刻只有一个线程能执行Python字节码。这意味着多线程无法真正并行利用多核CPU。
# 验证GIL的影响
import threading, time
def cpu_task(n):
total = 0
for i in range(n):
total += i * i
return total
# 单线程
start = time.time()
cpu_task(50_000_000)
cpu_task(50_000_000)
print(f'单线程: {time.time()-start:.2f}s')
# 双线程(期待2x速度,实际几乎一样慢!)
start = time.time()
t1 = threading.Thread(target=cpu_task, args=(50_000_000,))
t2 = threading.Thread(target=cpu_task, args=(50_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f'双线程: {time.time()-start:.2f}s') # 几乎和单线程一样!
# 结论: CPU密集型任务必须用多进程绕过GIL
二、asyncio事件循环工作原理
# 事件循环本质: 一个超级高效的while循环
# 简化版原理:
#
# while True:
# ready_tasks = get_ready_tasks() # 哪些协程可以运行
# for task in ready_tasks:
# task.step() # 运行到下一个await
# io_callbacks = poll_io_events() # 检查IO事件
# for cb in io_callbacks:
# cb.resume() # 唤醒等待IO的协程
# 关键: await表达式=暂停当前协程,把控制权交给事件循环
# 事件循环在等待IO时可以运行其他协程,实现并发
import asyncio, aiohttp, time
async def fetch(session, url):
async with session.get(url) as resp:
return len(await resp.text())
async def main():
urls = [f'https://httpbin.org/delay/1'] * 10 # 每个请求1秒
async with aiohttp.ClientSession() as session:
start = time.time()
# gather并发执行所有请求
results = await asyncio.gather(*[fetch(session, u) for u in urls])
print(f'10个请求,耗时: {time.time()-start:.2f}s') # ~1秒,而非10秒!
asyncio.run(main())
三、三种并发方式性能对比
| 场景 | 推荐方式 | 原因 | 典型库 |
|---|---|---|---|
| 大量HTTP请求 | asyncio协程 | IO等待期间可并发 | aiohttp |
| CPU密集计算 | multiprocessing | 绕过GIL,真正多核 | concurrent.futures |
| 文件/数据库IO | 线程池 | 简单,性能够用 | ThreadPoolExecutor |
| 混合IO+CPU | asyncio+进程池 | 协程处理IO,进程池处理CPU | loop.run_in_executor |
# 混合场景: asyncio+进程池
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(data):
"""CPU密集型函数"""
import hashlib
return hashlib.sha256(data * 10000).hexdigest()
async def main():
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=4)
# 把CPU任务扔进进程池,不阻塞事件循环
results = await asyncio.gather(*[
loop.run_in_executor(executor, cpu_heavy, b'data')
for _ in range(8)
])
print(f'完成{len(results)}个CPU任务')
四、生产级并发模式:限速与超时
import asyncio, aiohttp
async def fetch_with_limit(urls, max_concurrent=20, timeout=10):
"""带并发限制和超时的批量请求"""
semaphore = asyncio.Semaphore(max_concurrent) # 最多20个并发
results = []
async def fetch_one(session, url):
async with semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
return {'url': url, 'status': resp.status, 'ok': True}
except asyncio.TimeoutError:
return {'url': url, 'error': 'timeout', 'ok': False}
except Exception as e:
return {'url': url, 'error': str(e), 'ok': False}
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r['ok'])
print(f'完成: {ok}/{len(urls)} 成功')
return results
总结:Python并发的核心原则——IO密集型用asyncio,CPU密集型用multiprocessing,不要用多线程处理CPU任务。asyncio的事件循环本质是协作式调度,await是协作让出控制权。理解这个模型,你就能写出正确高效的异步代码。
