Performance Optimization Best Practices
Systematic performance optimization strategies to make your AI applications respond quickly and run efficiently
Advanced OptimizationUpdated: December 2024Important
Key Performance Indicators
< 1s
Response Time
Time to first byte target
99.9%
Availability
Service uptime
1000
Concurrency
Simultaneous request handling
80%
Cache Hit
Cache hit rate target
💡 Tip: Monitor these metrics regularly to identify and resolve performance bottlenecks in time.
Core Optimization Strategies
1. Request Optimization
Batch Processing Requests
# Inefficient: Multiple individual requests
results = []
for item in items:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": item}]
)
results.append(response)
# Efficient: Batch processing
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def batch_process(items):
tasks = [
async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": item}]
)
for item in items
]
return await asyncio.gather(*tasks)
results = asyncio.run(batch_process(items))Request Deduplication
import hashlib
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_api_call(prompt_hash):
"""Cache results for identical requests"""
return actual_api_call(prompt_hash)
def get_response(prompt):
# Generate hash of the prompt
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
# Check cache
if prompt_hash in cache:
return cache[prompt_hash]
# New request
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
cache[prompt_hash] = response
return response2. Streaming Response Optimization
❌ Traditional Method
- • Wait for complete response
- • User experience delay
- • Slow perceived speed
✅ Streaming Optimization
- • Real-time content display
- • Immediate user response
- • Improved perceived speed
// Frontend streaming processing
async function streamChat(message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, stream: true })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 实时UpdateUI
appendToChat(chunk);
}
}3. 智能缓存Strategy
多级缓存架构
L1内存缓存(Redis)- 毫秒级响应
L2分布式缓存(Memcached)- 秒级响应
L3持久化存储(Data库)- 长期存储
import redis
import json
from datetime import timedelta
class SmartCache:
def __init__(self):
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
def get_or_fetch(self, key, fetch_func, ttl=3600):
"""智能缓存: 优先从缓存获取, 否则执行函数并缓存结果"""
# 尝试从缓存获取
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
# 执行实际请求
result = fetch_func()
# 缓存结果
self.redis_client.setex(
key,
timedelta(seconds=ttl),
json.dumps(result)
)
return result
# usingExample
cache = SmartCache()
def get_ai_response(prompt):
cache_key = f"ai:response:{hashlib.md5(prompt.encode()).hexdigest()}"
def fetch():
return client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
return cache.get_or_fetch(cache_key, fetch, ttl=7200)4. 连接池Optimize
import httpx
from typing import Optional
class APIConnectionPool:
_instance: Optional['APIConnectionPool'] = None
_client: Optional[httpx.AsyncClient] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
),
timeout=httpx.Timeout(30.0, connect=5.0),
http2=True # 启用HTTP/2
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
# using连接池
pool = APIConnectionPool()
client = await pool.get_client()连接复用
减少连接建立开销
HTTP/2support
多路复用提升效率
自动重试
提高请求成功率
性能Monitor与Debug
性能MonitorCode
import time
import logging
from contextlib import contextmanager
from typing import Dict, Any
import statistics
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'response_times': [],
'token_usage': [],
'error_count': 0,
'success_count': 0
}
@contextmanager
def measure_time(self, operation: str):
"""测量操作执行时间"""
start = time.time()
try:
yield
self.metrics['success_count'] += 1
except Exception as e:
self.metrics['error_count'] += 1
logging.error(f"Error in {operation}: {e}")
raise
finally:
duration = time.time() - start
self.metrics['response_times'].append(duration)
logging.info(f"{operation} took {duration:.2f}s")
def add_token_usage(self, tokens: int):
"""记录Tokenusing量"""
self.metrics['token_usage'].append(tokens)
def get_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
if self.metrics['response_times']:
return {
'avg_response_time': statistics.mean(self.metrics['response_times']),
'p95_response_time': statistics.quantiles(
self.metrics['response_times'], n=20
)[18] if len(self.metrics['response_times']) > 20 else None,
'total_tokens': sum(self.metrics['token_usage']),
'error_rate': self.metrics['error_count'] /
(self.metrics['success_count'] + self.metrics['error_count'])
if (self.metrics['success_count'] + self.metrics['error_count']) > 0 else 0,
'total_requests': self.metrics['success_count'] + self.metrics['error_count']
}
return {}
# usingExample
monitor = PerformanceMonitor()
async def monitored_api_call(prompt):
with monitor.measure_time("API Call"):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
monitor.add_token_usage(response.usage.total_tokens)
return response
# 定期输出统计
print(monitor.get_stats())性能Optimize清单
✅ 请求Optimize
- using批处理减少请求次数
- Implement请求去重机制
- Optimizeprompt长度
- 设置合理的max_tokens
✅ 系统Optimize
- using连接池管理
- 实施多级缓存Strategy
- 启用HTTP/2协议
- Configure负载均衡
✅ Monitor指标
- 追踪响应时间分布
- MonitorTokenusing量
- 记录错误率和重试
- Analyze缓存命中率
✅ User Experience
- using流式响应
- Implement进度指示器
- 优雅的Error Handling
- provide取消操作
Optimize效果对比
| Optimize项 | Optimize前 | Optimize后 | 提升 |
|---|---|---|---|
| 平均响应时间 | 3.2s | 0.8s | 75%↑ |
| 并发处理能力 | 100/s | 1000/s | 10x |
| 缓存命中率 | 20% | 80% | 60%↑ |
| API成本 | $1000/月 | $400/月 | 60%↓ |
| 错误率 | 5% | 0.1% | 98%↓ |
🚀 立即开始Optimize
性能Optimize是一个持续的过程. 从最影响User Experience的部分开始, 逐步实施各项OptimizeStrategy.
第一步
实施流式响应和基础缓存
第二步
Optimize请求批处理和连接池
第三步
建立Monitor体系, 持续Optimize