[深度]Python并发编程完全指南:GIL原理、asyncio事件循环与高并发实战

阿里云推广

Python并发编程深度指南

Python并发是很多开发者的痛点——GIL为什么这么设计?asyncio到底是怎么工作的?多进程、多线程、协程如何选择?本文一次性说清楚。

一、GIL:Python的设计折衷

GIL(Global Interpreter Lock)是CPython的一个全局互斥锁,同一时刻只有一个线程能执行Python字节码。这意味着多线程无法真正并行利用多核CPU。

# 验证GIL的影响
import threading, time

def cpu_task(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

# 单线程
start = time.time()
cpu_task(50_000_000)
cpu_task(50_000_000)
print(f'单线程: {time.time()-start:.2f}s')

# 双线程(期待2x速度,实际几乎一样慢!)
start = time.time()
t1 = threading.Thread(target=cpu_task, args=(50_000_000,))
t2 = threading.Thread(target=cpu_task, args=(50_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f'双线程: {time.time()-start:.2f}s')  # 几乎和单线程一样!

# 结论: CPU密集型任务必须用多进程绕过GIL

二、asyncio事件循环工作原理

# 事件循环本质: 一个超级高效的while循环
# 简化版原理:
#
# while True:
#   ready_tasks = get_ready_tasks()  # 哪些协程可以运行
#   for task in ready_tasks:
#       task.step()  # 运行到下一个await
#   io_callbacks = poll_io_events()  # 检查IO事件
#   for cb in io_callbacks:
#       cb.resume()  # 唤醒等待IO的协程

# 关键: await表达式=暂停当前协程,把控制权交给事件循环
# 事件循环在等待IO时可以运行其他协程,实现并发

import asyncio, aiohttp, time

async def fetch(session, url):
    async with session.get(url) as resp:
        return len(await resp.text())

async def main():
    urls = [f'https://httpbin.org/delay/1'] * 10  # 每个请求1秒
    async with aiohttp.ClientSession() as session:
        start = time.time()
        # gather并发执行所有请求
        results = await asyncio.gather(*[fetch(session, u) for u in urls])
        print(f'10个请求,耗时: {time.time()-start:.2f}s')  # ~1秒,而非10秒!

asyncio.run(main())

三、三种并发方式性能对比

场景 推荐方式 原因 典型库
大量HTTP请求 asyncio协程 IO等待期间可并发 aiohttp
CPU密集计算 multiprocessing 绕过GIL,真正多核 concurrent.futures
文件/数据库IO 线程池 简单,性能够用 ThreadPoolExecutor
混合IO+CPU asyncio+进程池 协程处理IO,进程池处理CPU loop.run_in_executor
# 混合场景: asyncio+进程池
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(data):
    """CPU密集型函数"""
    import hashlib
    return hashlib.sha256(data * 10000).hexdigest()

async def main():
    loop = asyncio.get_event_loop()
    executor = ProcessPoolExecutor(max_workers=4)

    # 把CPU任务扔进进程池,不阻塞事件循环
    results = await asyncio.gather(*[
        loop.run_in_executor(executor, cpu_heavy, b'data')
        for _ in range(8)
    ])
    print(f'完成{len(results)}个CPU任务')

四、生产级并发模式:限速与超时

import asyncio, aiohttp

async def fetch_with_limit(urls, max_concurrent=20, timeout=10):
    """带并发限制和超时的批量请求"""
    semaphore = asyncio.Semaphore(max_concurrent)  # 最多20个并发
    results = []

    async def fetch_one(session, url):
        async with semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
                    return {'url': url, 'status': resp.status, 'ok': True}
            except asyncio.TimeoutError:
                return {'url': url, 'error': 'timeout', 'ok': False}
            except Exception as e:
                return {'url': url, 'error': str(e), 'ok': False}

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r['ok'])
    print(f'完成: {ok}/{len(urls)} 成功')
    return results

总结:Python并发的核心原则——IO密集型用asyncio,CPU密集型用multiprocessing,不要用多线程处理CPU任务。asyncio的事件循环本质是协作式调度,await是协作让出控制权。理解这个模型,你就能写出正确高效的异步代码。

发表评论