def fibonacci(n):
"""
计算斐波那契数列第n项
Args:
n: 第n项位置
Returns:
斐波那契数列第n项的值
"""
if n <= 0:
return 0
elif n == 1:
return 1
a, b = 0, 1
for _ in range(n - 1):
a, b = b, a + b
return b
def quicksort(arr):
if len(arr) <= 1:
return arr
pivot = arr[len(arr) // 2]
left = [x for x in arr if x < pivot]
middle = [x for x in arr if x == pivot]
right = [x for x in arr if x > pivot]
return quicksort(left) + middle + quicksort(right)
# monitoring_dashboard.py
import asyncio
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime
@dataclass
class MetricPoint:
timestamp: datetime
name: str
value: float
labels: dict
@dataclass
class Alert:
alert_id: str
severity: str
message: str
triggered_at: datetime
resolved_at: datetime = None
class MonitoringDashboard:
def __init__(self):
self.metrics: List[MetricPoint] = []
self.alerts: List[Alert] = []
self.alert_rules = {
"latency_p99": {"threshold": 2000, "window": 300},
"error_rate": {"threshold": 0.01, "window": 60},
"gpu_utilization": {"threshold": 0.95, "window": 60}
}
def record_metric(self, name: str, value: float, labels: dict = None):
self.metrics.append(MetricPoint(
timestamp=datetime.now(),
name=name,
value=value,
labels=labels or {}
))
def check_alerts(self) -> List[Alert]:
new_alerts = []
now = datetime.now()
for metric_name, rule in self.alert_rules.items():
recent = [
m for m in self.metrics
if m.name == metric_name
and (now - m.timestamp).total_seconds() < rule["window"]
]
if not recent:
continue
avg_value = sum(m.value for m in recent) / len(recent)
if avg_value > rule["threshold"]:
alert = Alert(
alert_id=f"alert-{len(self.alerts)}",
severity="critical" if metric_name in ["error_rate"] else "warning",
message=f"{metric_name} 超过阈值: {avg_value:.2f} > {rule['threshold']}",
triggered_at=now
)
new_alerts.append(alert)
self.alerts.append(alert)
return new_alerts
def get_dashboard_summary(self) -> dict:
now = datetime.now()
recent_metrics = {
name: [
m for m in self.metrics
if m.name == name
and (now - m.timestamp).total_seconds() < 300
]
for name in ["latency", "throughput", "gpu_utilization", "error_rate"]
}
return {
"total_metrics": len(self.metrics),
"active_alerts": sum(1 for a in self.alerts if a.resolved_at is None),
"metrics_summary": {
name: {
"count": len(points),
"avg": sum(p.value for p in points) / len(points) if points else 0,
"max": max((p.value for p in points), default=0),
"min": min((p.value for p in points), default=0)
}
for name, points in recent_metrics.items()
}
}
十、性能调优实战案例
10.1 批处理优化
批处理是提升推理吞吐量的关键优化手段:
python
# batch_optimizer.py
import asyncio
import time
from dataclasses import dataclass
from typing import List
from collections import deque
@dataclass
class InferenceRequest:
request_id: str
prompt: str
max_length: int
created_at: float
future: asyncio.Future
class BatchedInference:
def __init__(
self,
model,
max_batch_size: int = 32,
max_wait_time: float = 0.1,
max_sequence_length: int = 2048
):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.max_sequence_length = max_sequence_length
self.pending_requests: deque[InferenceRequest] = deque()
self.processing = False
async def add_request(
self,
request_id: str,
prompt: str,
max_length: int = 2048
) -> str:
future = asyncio.Future()
request = InferenceRequest(
request_id=request_id,
prompt=prompt,
max_length=max_length,
created_at=time.time(),
future=future
)
self.pending_requests.append(request)
if not self.processing:
asyncio.create_task(self._process_batch())
return await future
async def _process_batch(self):
self.processing = True
while self.pending_requests:
batch = []
start_time = time.time()
while (len(batch) < self.max_batch_size and
self.pending_requests and
time.time() - start_time < self.max_wait_time):
batch.append(self.pending_requests.popleft())
if not batch:
continue
try:
results = await self._run_inference(batch)
for request, result in zip(batch, results):
request.future.set_result(result)
except Exception as e:
for request in batch:
request.future.set_exception(e)
await asyncio.sleep(0.001)
self.processing = False
async def _run_inference(self, batch: List[InferenceRequest]) -> List[str]:
prompts = [req.prompt for req in batch]
await asyncio.sleep(0.05)
return [f"响应: {prompt[:20]}..." for prompt in prompts]
# 使用示例
async def main():
class MockModel:
pass
batched = BatchedInference(MockModel(), max_batch_size=8, max_wait_time=0.05)
start = time.time()
tasks = []
for i in range(20):
task = batched.add_request(f"req-{i}", f"这是请求 {i} 的内容", max_length=512)
tasks.append(task)
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"20 个请求耗时: {elapsed:.2f} 秒")
print(f"平均每个请求: {elapsed/20*1000:.1f} ms")
print(f"吞吐量: {20/elapsed:.1f} req/s")
asyncio.run(main())