Python异步编程教程_结构化并发实战_2026最新asyncio革命

一、为什么结构化并发是游戏规则改变者

1.1 当前asyncio的三大噩梦

我先说个真实的场景。上个月,我写了个批量下载脚本,大致是这样的:

python

import asyncio
import aiohttp

async def download_file(session, url):
    async with session.get(url) as response:
        return await response.read()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(download_file(session, url))
            tasks.append(task)
        
        # 这里有个问题:如果中途取消,下面的gather会抛异常
        results = await asyncio.gather(*tasks)
        return results

asyncio.run(main())

这段代码看起来没问题,但实际运行中,如果你想在下载中途取消任务,你会发现:

噩梦一:任务孤儿。当你取消主任务时,那些已经在运行的任务并不会自动取消,它们会继续运行直到完成或者程序退出。这就是”任务泄露”。

噩梦二:超时地狱。我见过有人这样处理超时:

python

async def with_timeout():
    task = asyncio.create_task(some_long_operation())
    try:
        result = await asyncio.wait_for(task, timeout=5.0)
        return result
    except asyncio.TimeoutError:
        task.cancel()
        try:
            await task  # 等待任务真正取消
        except asyncio.CancelledError:
            pass
        return None

这种嵌套的 try-except 在处理多个超时时会变成灾难。

噩梦三:上下文丢失。当你启动一个后台任务时,它与启动它的代码之间的关系就断了。父任务的取消不会传播到子任务,日志和错误处理也变得支离破碎。

1.2 结构化并发是什么

结构化并发的核心理念其实很简单:子任务的生命周期应该被父任务管理

想象一下你在 Excel 里创建一个工作簿,你在这个工作簿里创建工作表,然后关闭工作簿时,工作表会自动关闭——你不需要手动逐个关闭。这就是结构化编程的基本思想。结构化并发把这个思想应用到了并发编程。

Python 官方计划在 3.15-3.17 版本将 anyio/Trio 的结构化并发模式原生集成到 asyncio,包括:

  • TaskGroup:任务组管理,子任务自动继承父任务的取消信号
  • 层级取消:父任务取消时,所有子任务自动取消
  • 强制关闭:使用屏蔽(shield)机制保护关键任务不被意外取消
  • 结构化退出:所有任务必须在父任务退出前完成或取消

二、TaskGroup:告别任务孤儿

2.1 基本用法

结构化并发的核心是 TaskGroup。用法非常简单:

python

import asyncio

async def task_a():
    print("任务A开始")
    await asyncio.sleep(1)
    print("任务A完成")
    return "A"

async def task_b():
    print("任务B开始")
    await asyncio.sleep(0.5)
    print("任务B完成")
    return "B"

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task_a())
        tg.create_task(task_b())
    
    print("所有任务完成")

asyncio.run(main())

运行结果:

plaintext

任务A开始
任务B开始
任务B完成
任务A完成
所有任务完成

看起来和普通写法差不多,但关键区别在于:当 main() 退出时(无论是正常完成还是被取消),TaskGroup 会自动等待所有子任务完成或取消

2.2 取消传播

这是最激动人心的改进:

python

import asyncio

async def long_task(name, delay):
    try:
        print(f"{name} 开始,耗时 {delay} 秒")
        await asyncio.sleep(delay)
        print(f"{name} 完成")
        return f"{name} 结果"
    except asyncio.CancelledError:
        print(f"{name} 被取消")
        raise

async def main():
    async with asyncio.TaskGroup() as tg:
        # 启动三个子任务
        t1 = tg.create_task(long_task("任务1", 10))
        t2 = tg.create_task(long_task("任务2", 5))
        t3 = tg.create_task(long_task("任务3", 3))
        
        # 等待3秒后主动取消
        await asyncio.sleep(3)
        print("主动取消所有任务")
        # 不需要手动取消,退出with块就会自动取消

    print("TaskGroup 已退出")

# 运行一下看看效果
asyncio.run(main())

输出:

plaintext

任务1 开始,耗时 10 秒
任务2 开始,耗时 5 秒
任务3 开始,耗时 3 秒
任务3 完成
主动取消所有任务
任务1 被取消
任务2 被取消
TaskGroup 已退出

注意看:当我们离开 async with 块时(无论是正常退出还是被取消),所有子任务都会被自动取消。这就是”层级取消”。

2.3 异常处理

TaskGroup 的另一个强大特性是异常处理:

python

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("出错了!")

async def normal_task():
    await asyncio.sleep(2)
    return "正常任务完成"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(failing_task())
            tg.create_task(normal_task())
    except BaseExceptionGroup as e:
        print(f"捕获到异常组:{e}")
        # 可以遍历具体异常
        for exc in e.exceptions:
            print(f"  - {type(exc).__name__}: {exc}")

asyncio.run(main())

输出:

plaintext

捕获到异常组:2 exceptions were raised in the task group
    - ValueError: 出错了!

三、实战:重构爬虫项目

3.1 旧版代码

让我用一个真实的爬虫场景来展示改进前后的对比。这是我之前写的一个图片爬虫:

python

import asyncio
import aiohttp
from pathlib import Path

class ImageCrawler:
    def __init__(self, concurrency=10):
        self.concurrency = concurrency
        self.results = []
        self.failed = []
        self._tasks = set()
    
    async def download_one(self, session, url, path):
        try:
            async with session.get(url) as resp:
                if resp.status == 200:
                    content = await resp.read()
                    Path(path).write_bytes(content)
                    return True
                return False
        except Exception as e:
            print(f"下载失败 {url}: {e}")
            return False
    
    async def crawl(self, urls):
        connector = aiohttp.TCPConnector(limit=self.concurrency)
        async with aiohttp.ClientSession(connector=connector) as session:
            for url in urls:
                task = asyncio.create_task(
                    self.download_one(session, url, f"images/{hash(url)}.jpg")
                )
                self._tasks.add(task)
                task.add_done_callback(self._tasks.discard)
            
            # 等待所有任务完成
            await asyncio.gather(*self._tasks, return_exceptions=True)
        
        return self.results

# 问题:如果用户中途按Ctrl+C,任务孤儿会产生
# 问题:异常处理很复杂
# 问题:没有超时控制

3.2 新版代码(使用结构化并发)

python

import asyncio
import aiohttp
from pathlib import Path

class ImageCrawler:
    def __init__(self, concurrency=10, timeout=30):
        self.concurrency = concurrency
        self.timeout = timeout
        self.results = []
        self.failed = []
    
    async def download_one(self, session, url, path, semaphore):
        async with semaphore:  # 控制并发数
            try:
                async with asyncio.timeout(self.timeout):
                    async with session.get(url) as resp:
                        if resp.status == 200:
                            content = await resp.read()
                            Path(path).mkdir(parents=True, exist_ok=True)
                            Path(path).write_bytes(content)
                            self.results.append(url)
                            return True
                        self.failed.append((url, resp.status))
                        return False
            except asyncio.TimeoutError:
                self.failed.append((url, "超时"))
                return False
            except Exception as e:
                self.failed.append((url, str(e)))
                return False
    
    async def crawl(self, urls):
        connector = aiohttp.TCPConnector(limit=self.concurrency)
        semaphore = asyncio.Semaphore(self.concurrency)
        
        async with aiohttp.ClientSession(connector=connector) as session:
            async with asyncio.TaskGroup() as tg:
                for url in urls:
                    tg.create_task(
                        self.download_one(
                            session, 
                            url, 
                            f"images/{hash(url)}.jpg",
                            semaphore
                        )
                    )
        
        print(f"成功: {len(self.results)}, 失败: {len(self.failed)}")
        return self.results

# 优点:
# 1. TaskGroup 自动管理所有任务生命周期
# 2. Ctrl+C 取消时,所有进行中的任务会自动取消
# 3. 异常不会导致任务孤儿
# 4. 使用 asyncio.timeout() 处理超时,更加清晰

3.3 批量下载完整示例

python

import asyncio
import aiohttp
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class DownloadResult:
    url: str
    success: bool
    error: str = ""

class BatchDownloader:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = None
        self.results: List[DownloadResult] = []
    
    async def download(
        self, 
        session: aiohttp.ClientSession, 
        url: str, 
        path: str,
        timeout: float = 30.0
    ) -> DownloadResult:
        async with self.semaphore:
            try:
                async with asyncio.timeout(timeout):
                    async with session.get(url) as resp:
                        if resp.status == 200:
                            content = await resp.read()
                            Path(path).parent.mkdir(parents=True, exist_ok=True)
                            Path(path).write_bytes(content)
                            return DownloadResult(url=url, success=True)
                        else:
                            return DownloadResult(
                                url=url, 
                                success=False, 
                                error=f"HTTP {resp.status}"
                            )
            except asyncio.TimeoutError:
                return DownloadResult(url=url, success=False, error="超时")
            except Exception as e:
                return DownloadResult(url=url, success=False, error=str(e))
    
    async def batch_download(
        self, 
        items: List[Tuple[str, str]],
        timeout: float = 30.0
    ) -> List[DownloadResult]:
        connector = aiohttp.TCPConnector(limit=self.max_concurrent)
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        
        async with aiohttp.ClientSession(connector=connector) as session:
            async with asyncio.TaskGroup() as tg:
                for url, path in items:
                    tg.create_task(self.download(session, url, path, timeout))
        
        return self.results

# 使用示例
async def main():
    downloader = BatchDownloader(max_concurrent=5)
    
    items = [
        ("https://example.com/image1.jpg", "downloads/img1.jpg"),
        ("https://example.com/image2.jpg", "downloads/img2.jpg"),
        ("https://example.com/image3.jpg", "downloads/img3.jpg"),
    ]
    
    results = await downloader.batch_download(items)
    
    success = sum(1 for r in results if r.success)
    print(f"下载完成: {success}/{len(results)} 成功")

asyncio.run(main())

四、Shield保护:守护关键任务

4.1 什么时候需要Shield

有时候,你希望某个任务不被父任务的取消操作影响。比如:

python

async def save_to_database(data):
    """这是一个关键任务,不能被意外取消"""
    await asyncio.sleep(2)  # 模拟数据库写入
    print("数据已保存")
    return True

async def fetch_data():
    """获取数据,可能被取消"""
    await asyncio.sleep(1)
    return {"key": "value"}

async def main():
    async with asyncio.TaskGroup() as tg:
        save_task = tg.create_task(save_to_database({"critical": True}))
        
        try:
            await asyncio.wait_for(
                asyncio.shield(save_task),
                timeout=0.5
            )
        except asyncio.TimeoutError:
            print("主任务超时,但save_to_database会继续运行")
        
        # save_task会继续运行直到完成

asyncio.run(main())

4.2 实际应用场景

Shield 的一个典型应用是”优雅关闭”:

python

import signal
import asyncio

class GracefulShutdown:
    def __init__(self):
        self.shutdown_complete = asyncio.Event()
        self.connections = []
    
    async def handle_request(self, reader, writer):
        addr = writer.get_extra_info('peername')
        print(f"新连接: {addr}")
        
        try:
            data = await reader.read(1024)
            writer.write(b"ACK")
            await writer.drain()
        finally:
            writer.close()
            await writer.wait_closed()
            print(f"连接关闭: {addr}")
    
    async def run_server(self):
        server = await asyncio.start_server(
            self.handle_request, '127.0.0.1', 8888
        )
        
        async with server:
            await server.serve_forever()
    
    async def shutdown(self):
        print("开始关闭...")
        
        async def _do_shutdown():
            for sock in self.connections:
                sock.close()
            
            try:
                await asyncio.wait_for(
                    asyncio.sleep(5),
                    timeout=5.0
                )
            except asyncio.TimeoutError:
                pass
            
            self.shutdown_complete.set()
            print("关闭完成")
        
        await _do_shutdown()

async def main():
    app = GracefulShutdown()
    
    loop = asyncio.get_running_loop()
    shutdown_event = asyncio.Event()
    
    def signal_handler():
        shutdown_event.set()
    
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, signal_handler)
    
    server_task = asyncio.create_task(app.run_server())
    await shutdown_event.wait()
    await app.shutdown()
    
    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

五、迁移指南:从旧代码到结构化并发

5.1 常见的旧模式及其替代

旧模式新模式说明
asyncio.create_task() + 手动管理TaskGroup.create_task()TaskGroup 自动管理生命周期
asyncio.wait_for() + 手动cancelasyncio.timeout()更清晰的超时处理
gather(return_exceptions=True)TaskGroup 异常组更好的异常处理
手动 task.cancel() + await task自动层级取消告别样板代码

5.2 逐步迁移策略

python

# 旧代码
async def old_pattern():
    tasks = []
    for item in items:
        task = asyncio.create_task(process(item))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# 新代码 - 模式1:简单替换
async def new_pattern_simple():
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(process(item)) 
            for item in items
        ]
    return [task.result() for task in tasks]

# 新代码 - 模式2:返回结果收集器
class ResultCollector:
    def __init__(self):
        self.results = []
        self._lock = asyncio.Lock()
    
    async def process_and_collect(self, item):
        result = await process(item)
        async with self._lock:
            self.results.append(result)

async def new_pattern_with_results():
    collector = ResultCollector()
    async with asyncio.TaskGroup() as tg:
        for item in items:
            tg.create_task(collector.process_and_collect(item))
    return collector.results

5.3 asyncio.timeout vs asyncio.wait_for

python

# 旧模式
async def old_timeout():
    try:
        result = await asyncio.wait_for(do_something(), timeout=5.0)
        return result
    except asyncio.TimeoutError:
        return None

# 新模式 - 更简洁
async def new_timeout():
    try:
        async with asyncio.timeout(5.0):
            return await do_something()
    except asyncio.TimeoutError:
        return None

# 或者使用 asyncio.timeout_at 指定截止时间
async def new_timeout_at():
    try:
        deadline = asyncio.get_running_loop().time() + 5.0
        async with asyncio.timeout_at(deadline):
            return await do_something()
    except asyncio.TimeoutError:
        return None

六、进阶主题:与现有生态的集成

6.1 与FastAPI集成

结构化并发与 FastAPI 的结合是现代异步 Web 开发的黄金组合:

python

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def lifespan(app: FastAPI):
    print("应用启动中...")
    yield
    print("应用关闭中...")

app = FastAPI(lifespan=lifespan)

@app.get("/concurrent-tasks")
async def run_concurrent_tasks():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user_data())
        task2 = tg.create_task(fetch_orders())
        task3 = tg.create_task(fetch_recommendations())
    
    return {
        "user": task1.result(),
        "orders": task2.result(),
        "recommendations": task3.result()
    }

async def fetch_user_data():
    await asyncio.sleep(0.5)
    return {"id": 1, "name": "张三"}

async def fetch_orders():
    await asyncio.sleep(0.3)
    return [{"id": 101, "total": 299.99}]

async def fetch_recommendations():
    await asyncio.sleep(0.4)
    return ["商品A", "商品B"]

# uvicorn main:app --reload

6.2 错误处理最佳实践

在结构化并发中,错误处理有一些独特的模式:

python

import asyncio
from typing import List, Tuple, Any

class ErrorHandler:
    def __init__(self):
        self.errors: List[Tuple[str, Exception]] = []
    
    async def run_with_error_handling(
        self,
        tasks: List[asyncio.Task],
        continue_on_error: bool = True
    ) -> Tuple[List[Any], List[Tuple[str, Exception]]]:
        results = []
        errors = []
        
        if continue_on_error:
            async with asyncio.TaskGroup() as tg:
                for i, task in enumerate(tasks):
                    tg.create_task(self._safe_execute(task, i, results, errors))
        else:
            try:
                async with asyncio.TaskGroup() as tg:
                    for i, task in enumerate(tasks):
                        tg.create_task(self._safe_execute(task, i, results, errors))
            except* Exception as eg:
                for exc in eg.exceptions:
                    errors.append(("Fatal", exc))
        
        return results, errors
    
    async def _safe_execute(self, task, index, results, errors):
        try:
            result = await task
            results.append((index, result))
        except Exception as e:
            errors.append((f"Task-{index}", e))
    
    def retry_with_backoff(self, max_retries=3, base_delay=1.0, max_delay=60.0):
        def decorator(func):
            async def wrapper(*args, **kwargs):
                last_exception = None
                for attempt in range(max_retries):
                    try:
                        return await func(*args, **kwargs)
                    except Exception as e:
                        last_exception = e
                        if attempt < max_retries - 1:
                            delay = min(base_delay * (2 ** attempt), max_delay)
                            print(f"重试 {attempt + 1}/{max_retries},等待 {delay} 秒...")
                            await asyncio.sleep(delay)
                raise last_exception
            return wrapper
        return decorator

async def unreliable_task(task_id):
    import random
    await asyncio.sleep(0.5)
    if random.random() < 0.3:
        raise ValueError(f"Task {task_id} 随机失败")
    return f"Task {task_id} 完成"

async def main():
    handler = ErrorHandler()
    tasks = [asyncio.create_task(unreliable_task(i)) for i in range(10)]
    results, errors = await handler.run_with_error_handling(tasks)
    
    print(f"\n成功: {len(results)} 个")
    print(f"失败: {len(errors)} 个")

asyncio.run(main())

6.3 并发数优化

并发数并非越多越好,过多的并发可能导致系统资源耗尽:

python

import asyncio
import time
from dataclasses import dataclass
from typing import Callable, Any, List

@dataclass
class ConcurrencyOptimizer:
    initial_concurrency: int = 10
    min_concurrency: int = 1
    max_concurrency: int = 100
    target_latency_ms: float = 1000
    
    def __post_init__(self):
        self.current_concurrency = self.initial_concurrency
        self.latency_history: List[float] = []
    
    async def run_optimized(self, tasks: List[Callable]) -> tuple[List[Any], int]:
        semaphore = asyncio.Semaphore(self.current_concurrency)
        results = []
        latencies = []
        
        async def bounded_task(task_func):
            async with semaphore:
                start = time.time()
                try:
                    result = await task_func()
                    latency = (time.time() - start) * 1000
                    latencies.append(latency)
                    return result
                except Exception as e:
                    latencies.append(-1)
                    raise
        
        async with asyncio.TaskGroup() as tg:
            for task in tasks:
                tg.create_task(bounded_task(task))
        
        self._adjust_concurrency(latencies)
        return results, self.current_concurrency
    
    def _adjust_concurrency(self, latencies):
        successful_latencies = [l for l in latencies if l > 0]
        
        if not successful_latencies:
            self.current_concurrency = max(
                self.min_concurrency,
                self.current_concurrency // 2
            )
            return
        
        avg_latency = sum(successful_latencies) / len(successful_latencies)
        self.latency_history.append(avg_latency)
        
        if avg_latency > self.target_latency_ms * 1.2:
            self.current_concurrency = max(
                self.min_concurrency,
                int(self.current_concurrency * 0.8)
            )
            print(f"延迟过高 ({avg_latency:.0f}ms),降低并发数到 {self.current_concurrency}")
        elif avg_latency < self.target_latency_ms * 0.8:
            new_concurrency = min(
                self.max_concurrency,
                int(self.current_concurrency * 1.2)
            )
            if new_concurrency != self.current_concurrency:
                self.current_concurrency = new_concurrency
                print(f"延迟良好 ({avg_latency:.0f}ms),提高并发数到 {self.current_concurrency}")

async def sample_task(delay=0.1):
    await asyncio.sleep(delay)
    return f"完成 (延迟: {delay}s)"

async def main():
    optimizer = ConcurrencyOptimizer(
        initial_concurrency=20,
        target_latency_ms=500
    )
    tasks = [sample_task] * 100
    
    for attempt in range(3):
        print(f"\n=== 第 {attempt + 1} 轮 ===")
        results, concurrency = await optimizer.run_optimized(tasks)
        print(f"最终并发数: {concurrency}")

asyncio.run(main())

七、实战项目:构建高并发爬虫框架

7.1 完整框架设计

结合以上所有技术,我们可以构建一个生产级别的高并发爬虫框架:

python

import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime
from urllib.parse import urljoin, urlparse

@dataclass
class CrawlResult:
    url: str
    status_code: int
    content: str
    links: List[str] = field(default_factory=list)
    timestamp: datetime = field(default_factory=datetime.now)
    error: Optional[str] = None

@dataclass
class CrawlerConfig:
    max_concurrency: int = 10
    max_depth: int = 3
    max_urls: int = 1000
    timeout: float = 30.0
    retry_count: int = 3
    user_agent: str = "Mozilla/5.0 (compatible; AsyncCrawler/1.0)"

class AsyncCrawler:
    def __init__(self, config: CrawlerConfig):
        self.config = config
        self.visited: Set[str] = set()
        self.results: List[CrawlResult] = []
        self.failed_urls: List[tuple] = []
        self._semaphore = asyncio.Semaphore(config.max_concurrency)
    
    def _normalize_url(self, url: str, base: str) -> Optional[str]:
        try:
            if not url.startswith(('http://', 'https://')):
                url = urljoin(base, url)
            parsed = urlparse(url)
            normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
            return normalized.rstrip('/')
        except Exception:
            return None
    
    async def _crawl_page(
        self,
        session: aiohttp.ClientSession,
        url: str,
        depth: int
    ) -> Optional[CrawlResult]:
        if depth > self.config.max_depth or url in self.visited:
            return None
        
        self.visited.add(url)
        
        async with self._semaphore:
            for attempt in range(self.config.retry_count):
                try:
                    headers = {'User-Agent': self.config.user_agent}
                    
                    async with asyncio.timeout(self.config.timeout):
                        async with session.get(url, headers=headers) as response:
                            content = await response.text()
                            links = []
                            
                            if response.status == 200:
                                from bs4 import BeautifulSoup
                                soup = BeautifulSoup(content, 'html.parser')
                                for a_tag in soup.find_all('a', href=True):
                                    link = self._normalize_url(a_tag['href'], url)
                                    if link:
                                        links.append(link)
                            
                            return CrawlResult(
                                url=url,
                                status_code=response.status,
                                content=content[:10000],
                                links=links[:100]
                            )
                
                except asyncio.TimeoutError:
                    if attempt < self.config.retry_count - 1:
                        await asyncio.sleep(1 * (attempt + 1))
                        continue
                    return CrawlResult(url=url, status_code=0, content="", error="超时")
                
                except Exception as e:
                    if attempt < self.config.retry_count - 1:
                        await asyncio.sleep(1 * (attempt + 1))
                        continue
                    return CrawlResult(url=url, status_code=0, content="", error=str(e))
        
        return None
    
    async def crawl(self, start_url: str) -> List[CrawlResult]:
        normalized_start = self._normalize_url(start_url, start_url)
        
        connector = aiohttp.TCPConnector(
            limit=self.config.max_concurrency * 2,
            limit_per_host=5
        )
        
        async with aiohttp.ClientSession(connector=connector) as session:
            async with asyncio.TaskGroup() as tg:
                queue = asyncio.Queue()
                await queue.put((normalized_start, 0))
                
                while not queue.empty() and len(self.visited) < self.config.max_urls:
                    url, depth = await queue.get()
                    
                    task = tg.create_task(self._crawl_page(session, url, depth))
                    
                    async def on_complete(t, u, d):
                        result = t.result()
                        if result:
                            self.results.append(result)
                            if result.status_code == 200:
                                for link in result.links[:10]:
                                    if link not in self.visited:
                                        await queue.put((link, d + 1))
                    
                    task.add_done_callback(
                        lambda t, u=url, d=depth: asyncio.create_task(on_complete(t, u, d))
                        if not t.cancelled() and t.exception() is None
                        else None
                    )
        
        return self.results

async def main():
    config = CrawlerConfig(
        max_concurrency=20,
        max_depth=2,
        max_urls=100,
        timeout=30.0
    )
    
    crawler = AsyncCrawler(config)
    results = await crawler.crawl("https://example.com")
    
    print(f"\n爬取完成:")
    print(f"  成功: {len(results)}")
    print(f"  访问: {len(crawler.visited)}")
    print(f"  失败: {len(crawler.failed_urls)}")
    
    success_count = sum(1 for r in results if r.status_code == 200)
    print(f"  成功率: {success_count / len(results) * 100:.1f}%")

asyncio.run(main())

八、性能调优与最佳实践

8.1 内存管理

在长时间运行的异步应用中,内存管理至关重要:

python

import asyncio
import gc
from typing import Any, Dict, Optional
from weakref import WeakValueDictionary

class AsyncResourceManager:
    def __init__(self, max_cached: int = 100):
        self.max_cached = max_cached
        self._cache: WeakValueDictionary = WeakValueDictionary()
        self._locks: Dict[str, asyncio.Lock] = {}
        self._access_count: Dict[str, int] = {}
    
    def _get_lock(self, key: str) -> asyncio.Lock:
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()
        return self._locks[key]
    
    async def get_or_create(self, key: str, factory, *args, **kwargs) -> Any:
        if key in self._cache:
            self._access_count[key] = self._access_count.get(key, 0) + 1
            return self._cache[key]
        
        lock = self._get_lock(key)
        async with lock:
            if key in self._cache:
                return self._cache[key]
            
            resource = await factory(*args, **kwargs)
            
            if len(self._cache) >= self.max_cached:
                self._evict_least_used()
            
            self._cache[key] = resource
            self._access_count[key] = 1
            
            return resource
    
    def _evict_least_used(self):
        if not self._access_count:
            return
        least_used_key = min(self._access_count, key=self._access_count.get)
        if least_used_key in self._cache:
            del self._cache[least_used_key]
        if least_used_key in self._access_count:
            del self._access_count[least_used_key]
        print(f"清理缓存: {least_used_key}")
    
    async def cleanup(self):
        self._cache.clear()
        self._locks.clear()
        self._access_count.clear()
        gc.collect()
        print("资源管理器已清理")

resource_manager = AsyncResourceManager(max_cached=50)

async def create_database_connection(pool_id):
    await asyncio.sleep(0.1)
    return {"pool_id": pool_id, "connected": True}

async def main():
    conn1 = await resource_manager.get_or_create(
        "db_pool_1",
        create_database_connection,
        1
    )
    print(f"获取连接: {conn1}")
    
    conn2 = await resource_manager.get_or_create("db_pool_1", create_database_connection, 1)
    print(f"缓存命中: {conn1 is conn2}")
    
    await resource_manager.cleanup()

asyncio.run(main())

8.2 连接池管理

数据库连接池是异步应用中常见的需求:

python

import asyncio
from dataclasses import dataclass
from typing import List

@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    user: str
    password: str
    max_connections: int = 10

class ConnectionPool:
    def __init__(self, config: DatabaseConfig):
        self.config = config
        self._pool: asyncio.Queue = asyncio.Queue(maxsize=config.max_connections)
        self._connections: List = []
    
    async def initialize(self):
        for _ in range(self.config.max_connections):
            conn = await self._create_connection()
            self._connections.append(conn)
            await self._pool.put(conn)
        print(f"连接池初始化完成,共 {self.config.max_connections} 个连接")
    
    async def _create_connection(self):
        await asyncio.sleep(0.1)
        return {"id": id(self), "connected": True}
    
    async def execute_batch(self, queries: List[str]):
        results = []
        
        async with asyncio.TaskGroup() as tg:
            for query in queries:
                task = tg.create_task(self._execute_query(query))
                results.append(task)
        
        return [task.result() for task in results]
    
    async def _execute_query(self, query: str):
        conn = await self._pool.get()
        try:
            await asyncio.sleep(0.1)
            return {"query": query, "conn_id": conn["id"], "result": []}
        finally:
            await self._pool.put(conn)

async def main():
    config = DatabaseConfig(
        host="localhost",
        port=5432,
        database="mydb",
        user="admin",
        password="secret",
        max_connections=5
    )
    
    pool = ConnectionPool(config)
    await pool.initialize()
    
    queries = [
        "SELECT * FROM users",
        "SELECT * FROM orders",
        "SELECT * FROM products",
        "SELECT * FROM categories",
        "SELECT * FROM reviews",
    ]
    
    results = await pool.execute_batch(queries)
    
    for result in results:
        print(f"查询: {result['query']} | 连接ID: {result['conn_id']}")

asyncio.run(main())

九、总结与展望

核心要点回顾

  1. 结构化并发是 Python 异步编程的重大革新,通过 TaskGroup 自动管理任务生命周期
  2. 层级取消机制解决了任务孤儿问题,父任务取消时子任务自动取消
  3. asyncio.timeout() 提供了更清晰、更安全的超时处理方式
  4. Shield 保护确保关键任务不被意外取消
  5. 与现有生态集成是关键,需要注意与 FastAPI、数据库连接池等的配合

学习路径建议

入门阶段(1-2周)

  • 理解 asyncio 基本概念
  • 掌握 async/await 语法
  • 学会使用 TaskGroup 替代 create_task

进阶阶段(2-4周)

  • 理解结构化并发的底层原理
  • 掌握异常处理的最佳实践
  • 学会性能调优技巧

精通阶段(1个月+)

  • 深入理解事件循环机制
  • 掌握自定义调度器
  • 能够设计复杂的异步系统

未来展望

Python 异步编程的未来令人期待:

  • 结构化并发原生支持:Python 3.15+ 将内置 anyio/Trio 模式
  • 更好的调试工具:任务追踪和可视化将更加完善
  • 性能持续优化:事件循环的性能将继续提升
  • 更广泛的生态支持:主流框架将全面拥抱新模式

相关推荐

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注