Performance Optimization Best Practices

Systematic performance optimization strategies to make your AI applications respond quickly and run efficiently

Advanced OptimizationUpdated: December 2024Important

Key Performance Indicators

< 1s

Response Time

Time to first byte target

99.9%

Availability

Service uptime

1000

Concurrency

Simultaneous request handling

80%

Cache Hit

Cache hit rate target

💡 Tip: Monitor these metrics regularly to identify and resolve performance bottlenecks in time.

Core Optimization Strategies

1. Request Optimization

Batch Processing Requests

# Inefficient: Multiple individual requests
results = []
for item in items:
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": item}]
    )
    results.append(response)

# Efficient: Batch processing
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def batch_process(items):
    tasks = [
        async_client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": item}]
        )
        for item in items
    ]
    return await asyncio.gather(*tasks)

results = asyncio.run(batch_process(items))

Request Deduplication

import hashlib
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_api_call(prompt_hash):
    """Cache results for identical requests"""
    return actual_api_call(prompt_hash)

def get_response(prompt):
    # Generate hash of the prompt
    prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
    
    # Check cache
    if prompt_hash in cache:
        return cache[prompt_hash]
    
    # New request
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    
    cache[prompt_hash] = response
    return response

2. Streaming Response Optimization

❌ Traditional Method

  • • Wait for complete response
  • • User experience delay
  • • Slow perceived speed

✅ Streaming Optimization

  • • Real-time content display
  • • Immediate user response
  • • Improved perceived speed
// Frontend streaming processing
async function streamChat(message) {
    const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message, stream: true })
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        // 实时UpdateUI
        appendToChat(chunk);
    }
}

3. 智能缓存Strategy

多级缓存架构

L1内存缓存(Redis)- 毫秒级响应
L2分布式缓存(Memcached)- 秒级响应
L3持久化存储(Data库)- 长期存储
import redis
import json
from datetime import timedelta

class SmartCache:
    def __init__(self):
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True
        )
    
    def get_or_fetch(self, key, fetch_func, ttl=3600):
        """智能缓存: 优先从缓存获取, 否则执行函数并缓存结果"""
        # 尝试从缓存获取
        cached = self.redis_client.get(key)
        if cached:
            return json.loads(cached)
        
        # 执行实际请求
        result = fetch_func()
        
        # 缓存结果
        self.redis_client.setex(
            key,
            timedelta(seconds=ttl),
            json.dumps(result)
        )
        
        return result

# usingExample
cache = SmartCache()

def get_ai_response(prompt):
    cache_key = f"ai:response:{hashlib.md5(prompt.encode()).hexdigest()}"
    
    def fetch():
        return client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        ).choices[0].message.content
    
    return cache.get_or_fetch(cache_key, fetch, ttl=7200)

4. 连接池Optimize

import httpx
from typing import Optional

class APIConnectionPool:
    _instance: Optional['APIConnectionPool'] = None
    _client: Optional[httpx.AsyncClient] = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    async def get_client(self) -> httpx.AsyncClient:
        if self._client is None:
            self._client = httpx.AsyncClient(
                limits=httpx.Limits(
                    max_keepalive_connections=20,
                    max_connections=100,
                    keepalive_expiry=30
                ),
                timeout=httpx.Timeout(30.0, connect=5.0),
                http2=True  # 启用HTTP/2
            )
        return self._client
    
    async def close(self):
        if self._client:
            await self._client.aclose()
            self._client = None

# using连接池
pool = APIConnectionPool()
client = await pool.get_client()

连接复用

减少连接建立开销

HTTP/2support

多路复用提升效率

自动重试

提高请求成功率

性能Monitor与Debug

性能MonitorCode

import time
import logging
from contextlib import contextmanager
from typing import Dict, Any
import statistics

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'response_times': [],
            'token_usage': [],
            'error_count': 0,
            'success_count': 0
        }
    
    @contextmanager
    def measure_time(self, operation: str):
        """测量操作执行时间"""
        start = time.time()
        try:
            yield
            self.metrics['success_count'] += 1
        except Exception as e:
            self.metrics['error_count'] += 1
            logging.error(f"Error in {operation}: {e}")
            raise
        finally:
            duration = time.time() - start
            self.metrics['response_times'].append(duration)
            logging.info(f"{operation} took {duration:.2f}s")
    
    def add_token_usage(self, tokens: int):
        """记录Tokenusing量"""
        self.metrics['token_usage'].append(tokens)
    
    def get_stats(self) -> Dict[str, Any]:
        """获取性能统计"""
        if self.metrics['response_times']:
            return {
                'avg_response_time': statistics.mean(self.metrics['response_times']),
                'p95_response_time': statistics.quantiles(
                    self.metrics['response_times'], n=20
                )[18] if len(self.metrics['response_times']) > 20 else None,
                'total_tokens': sum(self.metrics['token_usage']),
                'error_rate': self.metrics['error_count'] / 
                    (self.metrics['success_count'] + self.metrics['error_count'])
                    if (self.metrics['success_count'] + self.metrics['error_count']) > 0 else 0,
                'total_requests': self.metrics['success_count'] + self.metrics['error_count']
            }
        return {}

# usingExample
monitor = PerformanceMonitor()

async def monitored_api_call(prompt):
    with monitor.measure_time("API Call"):
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )
        monitor.add_token_usage(response.usage.total_tokens)
        return response

# 定期输出统计
print(monitor.get_stats())

性能Optimize清单

✅ 请求Optimize

  • using批处理减少请求次数
  • Implement请求去重机制
  • Optimizeprompt长度
  • 设置合理的max_tokens

✅ 系统Optimize

  • using连接池管理
  • 实施多级缓存Strategy
  • 启用HTTP/2协议
  • Configure负载均衡

✅ Monitor指标

  • 追踪响应时间分布
  • MonitorTokenusing量
  • 记录错误率和重试
  • Analyze缓存命中率

✅ User Experience

  • using流式响应
  • Implement进度指示器
  • 优雅的Error Handling
  • provide取消操作

Optimize效果对比

Optimize项Optimize前Optimize后提升
平均响应时间3.2s0.8s75%↑
并发处理能力100/s1000/s10x
缓存命中率20%80%60%↑
API成本$1000/月$400/月60%↓
错误率5%0.1%98%↓

🚀 立即开始Optimize

性能Optimize是一个持续的过程. 从最影响User Experience的部分开始, 逐步实施各项OptimizeStrategy.

第一步

实施流式响应和基础缓存

第二步

Optimize请求批处理和连接池

第三步

建立Monitor体系, 持续Optimize