一、为什么结构化并发是游戏规则改变者
1.1 当前asyncio的三大噩梦
我先说个真实的场景。上个月,我写了个批量下载脚本,大致是这样的:
python
import asyncio
import aiohttp
async def download_file(session, url):
async with session.get(url) as response:
return await response.read()
async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(download_file(session, url))
tasks.append(task)
# 这里有个问题:如果中途取消,下面的gather会抛异常
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
这段代码看起来没问题,但实际运行中,如果你想在下载中途取消任务,你会发现:
噩梦一:任务孤儿。当你取消主任务时,那些已经在运行的任务并不会自动取消,它们会继续运行直到完成或者程序退出。这就是”任务泄露”。
噩梦二:超时地狱。我见过有人这样处理超时:
python
async def with_timeout():
task = asyncio.create_task(some_long_operation())
try:
result = await asyncio.wait_for(task, timeout=5.0)
return result
except asyncio.TimeoutError:
task.cancel()
try:
await task # 等待任务真正取消
except asyncio.CancelledError:
pass
return None
这种嵌套的 try-except 在处理多个超时时会变成灾难。
噩梦三:上下文丢失。当你启动一个后台任务时,它与启动它的代码之间的关系就断了。父任务的取消不会传播到子任务,日志和错误处理也变得支离破碎。
1.2 结构化并发是什么
结构化并发的核心理念其实很简单:子任务的生命周期应该被父任务管理。
想象一下你在 Excel 里创建一个工作簿,你在这个工作簿里创建工作表,然后关闭工作簿时,工作表会自动关闭——你不需要手动逐个关闭。这就是结构化编程的基本思想。结构化并发把这个思想应用到了并发编程。
Python 官方计划在 3.15-3.17 版本将 anyio/Trio 的结构化并发模式原生集成到 asyncio,包括:
- TaskGroup:任务组管理,子任务自动继承父任务的取消信号
- 层级取消:父任务取消时,所有子任务自动取消
- 强制关闭:使用屏蔽(shield)机制保护关键任务不被意外取消
- 结构化退出:所有任务必须在父任务退出前完成或取消

二、TaskGroup:告别任务孤儿
2.1 基本用法
结构化并发的核心是 TaskGroup。用法非常简单:
python
import asyncio
async def task_a():
print("任务A开始")
await asyncio.sleep(1)
print("任务A完成")
return "A"
async def task_b():
print("任务B开始")
await asyncio.sleep(0.5)
print("任务B完成")
return "B"
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task_a())
tg.create_task(task_b())
print("所有任务完成")
asyncio.run(main())
运行结果:
plaintext
任务A开始
任务B开始
任务B完成
任务A完成
所有任务完成
看起来和普通写法差不多,但关键区别在于:当 main() 退出时(无论是正常完成还是被取消),TaskGroup 会自动等待所有子任务完成或取消。
2.2 取消传播
这是最激动人心的改进:
python
import asyncio
async def long_task(name, delay):
try:
print(f"{name} 开始,耗时 {delay} 秒")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 结果"
except asyncio.CancelledError:
print(f"{name} 被取消")
raise
async def main():
async with asyncio.TaskGroup() as tg:
# 启动三个子任务
t1 = tg.create_task(long_task("任务1", 10))
t2 = tg.create_task(long_task("任务2", 5))
t3 = tg.create_task(long_task("任务3", 3))
# 等待3秒后主动取消
await asyncio.sleep(3)
print("主动取消所有任务")
# 不需要手动取消,退出with块就会自动取消
print("TaskGroup 已退出")
# 运行一下看看效果
asyncio.run(main())
输出:
plaintext
任务1 开始,耗时 10 秒
任务2 开始,耗时 5 秒
任务3 开始,耗时 3 秒
任务3 完成
主动取消所有任务
任务1 被取消
任务2 被取消
TaskGroup 已退出
注意看:当我们离开 async with 块时(无论是正常退出还是被取消),所有子任务都会被自动取消。这就是”层级取消”。
2.3 异常处理
TaskGroup 的另一个强大特性是异常处理:
python
async def failing_task():
await asyncio.sleep(1)
raise ValueError("出错了!")
async def normal_task():
await asyncio.sleep(2)
return "正常任务完成"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task())
tg.create_task(normal_task())
except BaseExceptionGroup as e:
print(f"捕获到异常组:{e}")
# 可以遍历具体异常
for exc in e.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(main())
输出:
plaintext
捕获到异常组:2 exceptions were raised in the task group
- ValueError: 出错了!
三、实战:重构爬虫项目
3.1 旧版代码
让我用一个真实的爬虫场景来展示改进前后的对比。这是我之前写的一个图片爬虫:
python
import asyncio
import aiohttp
from pathlib import Path
class ImageCrawler:
def __init__(self, concurrency=10):
self.concurrency = concurrency
self.results = []
self.failed = []
self._tasks = set()
async def download_one(self, session, url, path):
try:
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
Path(path).write_bytes(content)
return True
return False
except Exception as e:
print(f"下载失败 {url}: {e}")
return False
async def crawl(self, urls):
connector = aiohttp.TCPConnector(limit=self.concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
for url in urls:
task = asyncio.create_task(
self.download_one(session, url, f"images/{hash(url)}.jpg")
)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
# 等待所有任务完成
await asyncio.gather(*self._tasks, return_exceptions=True)
return self.results
# 问题:如果用户中途按Ctrl+C,任务孤儿会产生
# 问题:异常处理很复杂
# 问题:没有超时控制
3.2 新版代码(使用结构化并发)
python
import asyncio
import aiohttp
from pathlib import Path
class ImageCrawler:
def __init__(self, concurrency=10, timeout=30):
self.concurrency = concurrency
self.timeout = timeout
self.results = []
self.failed = []
async def download_one(self, session, url, path, semaphore):
async with semaphore: # 控制并发数
try:
async with asyncio.timeout(self.timeout):
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
Path(path).mkdir(parents=True, exist_ok=True)
Path(path).write_bytes(content)
self.results.append(url)
return True
self.failed.append((url, resp.status))
return False
except asyncio.TimeoutError:
self.failed.append((url, "超时"))
return False
except Exception as e:
self.failed.append((url, str(e)))
return False
async def crawl(self, urls):
connector = aiohttp.TCPConnector(limit=self.concurrency)
semaphore = asyncio.Semaphore(self.concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(
self.download_one(
session,
url,
f"images/{hash(url)}.jpg",
semaphore
)
)
print(f"成功: {len(self.results)}, 失败: {len(self.failed)}")
return self.results
# 优点:
# 1. TaskGroup 自动管理所有任务生命周期
# 2. Ctrl+C 取消时,所有进行中的任务会自动取消
# 3. 异常不会导致任务孤儿
# 4. 使用 asyncio.timeout() 处理超时,更加清晰
3.3 批量下载完整示例
python
import asyncio
import aiohttp
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class DownloadResult:
url: str
success: bool
error: str = ""
class BatchDownloader:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = None
self.results: List[DownloadResult] = []
async def download(
self,
session: aiohttp.ClientSession,
url: str,
path: str,
timeout: float = 30.0
) -> DownloadResult:
async with self.semaphore:
try:
async with asyncio.timeout(timeout):
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.read()
Path(path).parent.mkdir(parents=True, exist_ok=True)
Path(path).write_bytes(content)
return DownloadResult(url=url, success=True)
else:
return DownloadResult(
url=url,
success=False,
error=f"HTTP {resp.status}"
)
except asyncio.TimeoutError:
return DownloadResult(url=url, success=False, error="超时")
except Exception as e:
return DownloadResult(url=url, success=False, error=str(e))
async def batch_download(
self,
items: List[Tuple[str, str]],
timeout: float = 30.0
) -> List[DownloadResult]:
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
self.semaphore = asyncio.Semaphore(self.max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
async with asyncio.TaskGroup() as tg:
for url, path in items:
tg.create_task(self.download(session, url, path, timeout))
return self.results
# 使用示例
async def main():
downloader = BatchDownloader(max_concurrent=5)
items = [
("https://example.com/image1.jpg", "downloads/img1.jpg"),
("https://example.com/image2.jpg", "downloads/img2.jpg"),
("https://example.com/image3.jpg", "downloads/img3.jpg"),
]
results = await downloader.batch_download(items)
success = sum(1 for r in results if r.success)
print(f"下载完成: {success}/{len(results)} 成功")
asyncio.run(main())
四、Shield保护:守护关键任务
4.1 什么时候需要Shield
有时候,你希望某个任务不被父任务的取消操作影响。比如:
python
async def save_to_database(data):
"""这是一个关键任务,不能被意外取消"""
await asyncio.sleep(2) # 模拟数据库写入
print("数据已保存")
return True
async def fetch_data():
"""获取数据,可能被取消"""
await asyncio.sleep(1)
return {"key": "value"}
async def main():
async with asyncio.TaskGroup() as tg:
save_task = tg.create_task(save_to_database({"critical": True}))
try:
await asyncio.wait_for(
asyncio.shield(save_task),
timeout=0.5
)
except asyncio.TimeoutError:
print("主任务超时,但save_to_database会继续运行")
# save_task会继续运行直到完成
asyncio.run(main())
4.2 实际应用场景
Shield 的一个典型应用是”优雅关闭”:
python
import signal
import asyncio
class GracefulShutdown:
def __init__(self):
self.shutdown_complete = asyncio.Event()
self.connections = []
async def handle_request(self, reader, writer):
addr = writer.get_extra_info('peername')
print(f"新连接: {addr}")
try:
data = await reader.read(1024)
writer.write(b"ACK")
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
print(f"连接关闭: {addr}")
async def run_server(self):
server = await asyncio.start_server(
self.handle_request, '127.0.0.1', 8888
)
async with server:
await server.serve_forever()
async def shutdown(self):
print("开始关闭...")
async def _do_shutdown():
for sock in self.connections:
sock.close()
try:
await asyncio.wait_for(
asyncio.sleep(5),
timeout=5.0
)
except asyncio.TimeoutError:
pass
self.shutdown_complete.set()
print("关闭完成")
await _do_shutdown()
async def main():
app = GracefulShutdown()
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def signal_handler():
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
server_task = asyncio.create_task(app.run_server())
await shutdown_event.wait()
await app.shutdown()
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
asyncio.run(main())
五、迁移指南:从旧代码到结构化并发
5.1 常见的旧模式及其替代
| 旧模式 | 新模式 | 说明 |
|---|---|---|
asyncio.create_task() + 手动管理 | TaskGroup.create_task() | TaskGroup 自动管理生命周期 |
asyncio.wait_for() + 手动cancel | asyncio.timeout() | 更清晰的超时处理 |
gather(return_exceptions=True) | TaskGroup 异常组 | 更好的异常处理 |
手动 task.cancel() + await task | 自动层级取消 | 告别样板代码 |
5.2 逐步迁移策略
python
# 旧代码
async def old_pattern():
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 新代码 - 模式1:简单替换
async def new_pattern_simple():
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(process(item))
for item in items
]
return [task.result() for task in tasks]
# 新代码 - 模式2:返回结果收集器
class ResultCollector:
def __init__(self):
self.results = []
self._lock = asyncio.Lock()
async def process_and_collect(self, item):
result = await process(item)
async with self._lock:
self.results.append(result)
async def new_pattern_with_results():
collector = ResultCollector()
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(collector.process_and_collect(item))
return collector.results
5.3 asyncio.timeout vs asyncio.wait_for
python
# 旧模式
async def old_timeout():
try:
result = await asyncio.wait_for(do_something(), timeout=5.0)
return result
except asyncio.TimeoutError:
return None
# 新模式 - 更简洁
async def new_timeout():
try:
async with asyncio.timeout(5.0):
return await do_something()
except asyncio.TimeoutError:
return None
# 或者使用 asyncio.timeout_at 指定截止时间
async def new_timeout_at():
try:
deadline = asyncio.get_running_loop().time() + 5.0
async with asyncio.timeout_at(deadline):
return await do_something()
except asyncio.TimeoutError:
return None
六、进阶主题:与现有生态的集成
6.1 与FastAPI集成
结构化并发与 FastAPI 的结合是现代异步 Web 开发的黄金组合:
python
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
print("应用启动中...")
yield
print("应用关闭中...")
app = FastAPI(lifespan=lifespan)
@app.get("/concurrent-tasks")
async def run_concurrent_tasks():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user_data())
task2 = tg.create_task(fetch_orders())
task3 = tg.create_task(fetch_recommendations())
return {
"user": task1.result(),
"orders": task2.result(),
"recommendations": task3.result()
}
async def fetch_user_data():
await asyncio.sleep(0.5)
return {"id": 1, "name": "张三"}
async def fetch_orders():
await asyncio.sleep(0.3)
return [{"id": 101, "total": 299.99}]
async def fetch_recommendations():
await asyncio.sleep(0.4)
return ["商品A", "商品B"]
# uvicorn main:app --reload
6.2 错误处理最佳实践
在结构化并发中,错误处理有一些独特的模式:
python
import asyncio
from typing import List, Tuple, Any
class ErrorHandler:
def __init__(self):
self.errors: List[Tuple[str, Exception]] = []
async def run_with_error_handling(
self,
tasks: List[asyncio.Task],
continue_on_error: bool = True
) -> Tuple[List[Any], List[Tuple[str, Exception]]]:
results = []
errors = []
if continue_on_error:
async with asyncio.TaskGroup() as tg:
for i, task in enumerate(tasks):
tg.create_task(self._safe_execute(task, i, results, errors))
else:
try:
async with asyncio.TaskGroup() as tg:
for i, task in enumerate(tasks):
tg.create_task(self._safe_execute(task, i, results, errors))
except* Exception as eg:
for exc in eg.exceptions:
errors.append(("Fatal", exc))
return results, errors
async def _safe_execute(self, task, index, results, errors):
try:
result = await task
results.append((index, result))
except Exception as e:
errors.append((f"Task-{index}", e))
def retry_with_backoff(self, max_retries=3, base_delay=1.0, max_delay=60.0):
def decorator(func):
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"重试 {attempt + 1}/{max_retries},等待 {delay} 秒...")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
async def unreliable_task(task_id):
import random
await asyncio.sleep(0.5)
if random.random() < 0.3:
raise ValueError(f"Task {task_id} 随机失败")
return f"Task {task_id} 完成"
async def main():
handler = ErrorHandler()
tasks = [asyncio.create_task(unreliable_task(i)) for i in range(10)]
results, errors = await handler.run_with_error_handling(tasks)
print(f"\n成功: {len(results)} 个")
print(f"失败: {len(errors)} 个")
asyncio.run(main())
6.3 并发数优化
并发数并非越多越好,过多的并发可能导致系统资源耗尽:
python
import asyncio
import time
from dataclasses import dataclass
from typing import Callable, Any, List
@dataclass
class ConcurrencyOptimizer:
initial_concurrency: int = 10
min_concurrency: int = 1
max_concurrency: int = 100
target_latency_ms: float = 1000
def __post_init__(self):
self.current_concurrency = self.initial_concurrency
self.latency_history: List[float] = []
async def run_optimized(self, tasks: List[Callable]) -> tuple[List[Any], int]:
semaphore = asyncio.Semaphore(self.current_concurrency)
results = []
latencies = []
async def bounded_task(task_func):
async with semaphore:
start = time.time()
try:
result = await task_func()
latency = (time.time() - start) * 1000
latencies.append(latency)
return result
except Exception as e:
latencies.append(-1)
raise
async with asyncio.TaskGroup() as tg:
for task in tasks:
tg.create_task(bounded_task(task))
self._adjust_concurrency(latencies)
return results, self.current_concurrency
def _adjust_concurrency(self, latencies):
successful_latencies = [l for l in latencies if l > 0]
if not successful_latencies:
self.current_concurrency = max(
self.min_concurrency,
self.current_concurrency // 2
)
return
avg_latency = sum(successful_latencies) / len(successful_latencies)
self.latency_history.append(avg_latency)
if avg_latency > self.target_latency_ms * 1.2:
self.current_concurrency = max(
self.min_concurrency,
int(self.current_concurrency * 0.8)
)
print(f"延迟过高 ({avg_latency:.0f}ms),降低并发数到 {self.current_concurrency}")
elif avg_latency < self.target_latency_ms * 0.8:
new_concurrency = min(
self.max_concurrency,
int(self.current_concurrency * 1.2)
)
if new_concurrency != self.current_concurrency:
self.current_concurrency = new_concurrency
print(f"延迟良好 ({avg_latency:.0f}ms),提高并发数到 {self.current_concurrency}")
async def sample_task(delay=0.1):
await asyncio.sleep(delay)
return f"完成 (延迟: {delay}s)"
async def main():
optimizer = ConcurrencyOptimizer(
initial_concurrency=20,
target_latency_ms=500
)
tasks = [sample_task] * 100
for attempt in range(3):
print(f"\n=== 第 {attempt + 1} 轮 ===")
results, concurrency = await optimizer.run_optimized(tasks)
print(f"最终并发数: {concurrency}")
asyncio.run(main())
七、实战项目:构建高并发爬虫框架
7.1 完整框架设计
结合以上所有技术,我们可以构建一个生产级别的高并发爬虫框架:
python
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime
from urllib.parse import urljoin, urlparse
@dataclass
class CrawlResult:
url: str
status_code: int
content: str
links: List[str] = field(default_factory=list)
timestamp: datetime = field(default_factory=datetime.now)
error: Optional[str] = None
@dataclass
class CrawlerConfig:
max_concurrency: int = 10
max_depth: int = 3
max_urls: int = 1000
timeout: float = 30.0
retry_count: int = 3
user_agent: str = "Mozilla/5.0 (compatible; AsyncCrawler/1.0)"
class AsyncCrawler:
def __init__(self, config: CrawlerConfig):
self.config = config
self.visited: Set[str] = set()
self.results: List[CrawlResult] = []
self.failed_urls: List[tuple] = []
self._semaphore = asyncio.Semaphore(config.max_concurrency)
def _normalize_url(self, url: str, base: str) -> Optional[str]:
try:
if not url.startswith(('http://', 'https://')):
url = urljoin(base, url)
parsed = urlparse(url)
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return normalized.rstrip('/')
except Exception:
return None
async def _crawl_page(
self,
session: aiohttp.ClientSession,
url: str,
depth: int
) -> Optional[CrawlResult]:
if depth > self.config.max_depth or url in self.visited:
return None
self.visited.add(url)
async with self._semaphore:
for attempt in range(self.config.retry_count):
try:
headers = {'User-Agent': self.config.user_agent}
async with asyncio.timeout(self.config.timeout):
async with session.get(url, headers=headers) as response:
content = await response.text()
links = []
if response.status == 200:
from bs4 import BeautifulSoup
soup = BeautifulSoup(content, 'html.parser')
for a_tag in soup.find_all('a', href=True):
link = self._normalize_url(a_tag['href'], url)
if link:
links.append(link)
return CrawlResult(
url=url,
status_code=response.status,
content=content[:10000],
links=links[:100]
)
except asyncio.TimeoutError:
if attempt < self.config.retry_count - 1:
await asyncio.sleep(1 * (attempt + 1))
continue
return CrawlResult(url=url, status_code=0, content="", error="超时")
except Exception as e:
if attempt < self.config.retry_count - 1:
await asyncio.sleep(1 * (attempt + 1))
continue
return CrawlResult(url=url, status_code=0, content="", error=str(e))
return None
async def crawl(self, start_url: str) -> List[CrawlResult]:
normalized_start = self._normalize_url(start_url, start_url)
connector = aiohttp.TCPConnector(
limit=self.config.max_concurrency * 2,
limit_per_host=5
)
async with aiohttp.ClientSession(connector=connector) as session:
async with asyncio.TaskGroup() as tg:
queue = asyncio.Queue()
await queue.put((normalized_start, 0))
while not queue.empty() and len(self.visited) < self.config.max_urls:
url, depth = await queue.get()
task = tg.create_task(self._crawl_page(session, url, depth))
async def on_complete(t, u, d):
result = t.result()
if result:
self.results.append(result)
if result.status_code == 200:
for link in result.links[:10]:
if link not in self.visited:
await queue.put((link, d + 1))
task.add_done_callback(
lambda t, u=url, d=depth: asyncio.create_task(on_complete(t, u, d))
if not t.cancelled() and t.exception() is None
else None
)
return self.results
async def main():
config = CrawlerConfig(
max_concurrency=20,
max_depth=2,
max_urls=100,
timeout=30.0
)
crawler = AsyncCrawler(config)
results = await crawler.crawl("https://example.com")
print(f"\n爬取完成:")
print(f" 成功: {len(results)}")
print(f" 访问: {len(crawler.visited)}")
print(f" 失败: {len(crawler.failed_urls)}")
success_count = sum(1 for r in results if r.status_code == 200)
print(f" 成功率: {success_count / len(results) * 100:.1f}%")
asyncio.run(main())
八、性能调优与最佳实践
8.1 内存管理
在长时间运行的异步应用中,内存管理至关重要:
python
import asyncio
import gc
from typing import Any, Dict, Optional
from weakref import WeakValueDictionary
class AsyncResourceManager:
def __init__(self, max_cached: int = 100):
self.max_cached = max_cached
self._cache: WeakValueDictionary = WeakValueDictionary()
self._locks: Dict[str, asyncio.Lock] = {}
self._access_count: Dict[str, int] = {}
def _get_lock(self, key: str) -> asyncio.Lock:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
return self._locks[key]
async def get_or_create(self, key: str, factory, *args, **kwargs) -> Any:
if key in self._cache:
self._access_count[key] = self._access_count.get(key, 0) + 1
return self._cache[key]
lock = self._get_lock(key)
async with lock:
if key in self._cache:
return self._cache[key]
resource = await factory(*args, **kwargs)
if len(self._cache) >= self.max_cached:
self._evict_least_used()
self._cache[key] = resource
self._access_count[key] = 1
return resource
def _evict_least_used(self):
if not self._access_count:
return
least_used_key = min(self._access_count, key=self._access_count.get)
if least_used_key in self._cache:
del self._cache[least_used_key]
if least_used_key in self._access_count:
del self._access_count[least_used_key]
print(f"清理缓存: {least_used_key}")
async def cleanup(self):
self._cache.clear()
self._locks.clear()
self._access_count.clear()
gc.collect()
print("资源管理器已清理")
resource_manager = AsyncResourceManager(max_cached=50)
async def create_database_connection(pool_id):
await asyncio.sleep(0.1)
return {"pool_id": pool_id, "connected": True}
async def main():
conn1 = await resource_manager.get_or_create(
"db_pool_1",
create_database_connection,
1
)
print(f"获取连接: {conn1}")
conn2 = await resource_manager.get_or_create("db_pool_1", create_database_connection, 1)
print(f"缓存命中: {conn1 is conn2}")
await resource_manager.cleanup()
asyncio.run(main())
8.2 连接池管理
数据库连接池是异步应用中常见的需求:
python
import asyncio
from dataclasses import dataclass
from typing import List
@dataclass
class DatabaseConfig:
host: str
port: int
database: str
user: str
password: str
max_connections: int = 10
class ConnectionPool:
def __init__(self, config: DatabaseConfig):
self.config = config
self._pool: asyncio.Queue = asyncio.Queue(maxsize=config.max_connections)
self._connections: List = []
async def initialize(self):
for _ in range(self.config.max_connections):
conn = await self._create_connection()
self._connections.append(conn)
await self._pool.put(conn)
print(f"连接池初始化完成,共 {self.config.max_connections} 个连接")
async def _create_connection(self):
await asyncio.sleep(0.1)
return {"id": id(self), "connected": True}
async def execute_batch(self, queries: List[str]):
results = []
async with asyncio.TaskGroup() as tg:
for query in queries:
task = tg.create_task(self._execute_query(query))
results.append(task)
return [task.result() for task in results]
async def _execute_query(self, query: str):
conn = await self._pool.get()
try:
await asyncio.sleep(0.1)
return {"query": query, "conn_id": conn["id"], "result": []}
finally:
await self._pool.put(conn)
async def main():
config = DatabaseConfig(
host="localhost",
port=5432,
database="mydb",
user="admin",
password="secret",
max_connections=5
)
pool = ConnectionPool(config)
await pool.initialize()
queries = [
"SELECT * FROM users",
"SELECT * FROM orders",
"SELECT * FROM products",
"SELECT * FROM categories",
"SELECT * FROM reviews",
]
results = await pool.execute_batch(queries)
for result in results:
print(f"查询: {result['query']} | 连接ID: {result['conn_id']}")
asyncio.run(main())
九、总结与展望
核心要点回顾
- 结构化并发是 Python 异步编程的重大革新,通过 TaskGroup 自动管理任务生命周期
- 层级取消机制解决了任务孤儿问题,父任务取消时子任务自动取消
- asyncio.timeout() 提供了更清晰、更安全的超时处理方式
- Shield 保护确保关键任务不被意外取消
- 与现有生态集成是关键,需要注意与 FastAPI、数据库连接池等的配合
学习路径建议
入门阶段(1-2周):
- 理解 asyncio 基本概念
- 掌握 async/await 语法
- 学会使用 TaskGroup 替代 create_task
进阶阶段(2-4周):
- 理解结构化并发的底层原理
- 掌握异常处理的最佳实践
- 学会性能调优技巧
精通阶段(1个月+):
- 深入理解事件循环机制
- 掌握自定义调度器
- 能够设计复杂的异步系统
未来展望
Python 异步编程的未来令人期待:
- 结构化并发原生支持:Python 3.15+ 将内置 anyio/Trio 模式
- 更好的调试工具:任务追踪和可视化将更加完善
- 性能持续优化:事件循环的性能将继续提升
- 更广泛的生态支持:主流框架将全面拥抱新模式

发表回复