Comprehensive Rate Limit Handling Solution
Deeply understand API rate limiting, master mitigation strategies, and ensure stable, reliable services.
Rate limiting protection
Prevent service overload
Smart retries
Automatically wait for recovery
Rate control
Request frequency management
Load balancing
Distribute across multiple keys
1. API rate limiting rules
Types of limits
RPM (Requests Per Minute)
Requests per minute limit to prevent excessive frequency
TPM (Tokens Per Minute)
Tokens per minute limit to control processing volume
Model limit comparison
| Model | RPM | TPM | Tier |
|---|---|---|---|
| GPT-4o | 500 | 30,000 | Tier 1 |
| GPT-4o | 5,000 | 450,000 | Tier 4 |
| GPT-4o mini | 5,000 | 200,000 | Tier 1 |
| GPT-4o mini | 30,000 | 150,000,000 | Tier 5 |
| Claude 3.5 Sonnet | 1,000 | 100,000 | Standard |
| Claude 3.5 Haiku | 5,000 | 500,000 | Standard |
2. Basic rate-limit handling
Error handling and retries
import time
import requests
from typing import Optional, Dict
class RateLimitHandler:
"""Smart rate limit handler"""
def __init__(self):
self.retry_after_default = 5 # Default retry wait time (seconds)
def make_request_with_retry(
self,
url: str,
headers: Dict,
data: Dict,
max_retries: int = 3
) -> Optional[Dict]:
"""Make a request with rate limit handling"""
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=data)
# Success
if response.status_code == 200:
return response.json()
# Rate limit error
elif response.status_code == 429:
retry_after = self._get_retry_after(response)
print(f"Rate limit hit, waiting {retry_after} seconds...")
# Show remaining limit info
self._show_rate_limit_info(response.headers)
time.sleep(retry_after)
continue
# Other errors
else:
print(f"Request failed: {response.status_code}")
return None
except Exception as e:
print(f"Request exception: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
return None
def _get_retry_after(self, response) -> int:
"""Get retry time from response headers"""
retry_after = response.headers.get('Retry-After')
if retry_after:
# Could be seconds or a timestamp
try:
return int(retry_after)
except ValueError:
# If it's a date, calculate wait seconds
from datetime import datetime
retry_time = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
wait_seconds = (retry_time - datetime.utcnow()).total_seconds()
return max(0, int(wait_seconds))
return self.retry_after_default
def _show_rate_limit_info(self, headers):
"""Display rate limit info"""
info = {
"Limit": headers.get('X-RateLimit-Limit'),
"Remaining": headers.get('X-RateLimit-Remaining'),
"Reset": headers.get('X-RateLimit-Reset'),
"Retry-After": headers.get('Retry-After')
}
for key, value in info.items():
if value:
print(f" {key}: {value}")
# Usage example
handler = RateLimitHandler()
response = handler.make_request_with_retry(
url="https://api.n1n.ai/v1/chat/completions",
headers={"Authorization": "Bearer your-key"},
data={
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Hello"}]
}
)💡 Response headers
- •
X-RateLimit-Limit: Total allowed - •
X-RateLimit-Remaining: Remaining quota - •
X-RateLimit-Reset: Reset timestamp - •
Retry-After: Suggested wait seconds
3. Advanced rate-limiting strategies
Token bucket algorithm
import asyncio
from collections import deque
from typing import Dict, Optional
import time
class TokenBucketRateLimiter:
"""Token bucket rate limiter"""
def __init__(self, rpm: int, tpm: int):
self.rpm = rpm # Requests per minute
self.tpm = tpm # Tokens per minute
self.request_times = deque()
self.token_usage = deque()
self.lock = asyncio.Lock()
async def wait_if_needed(self, estimated_tokens: int = 0):
"""Wait if needed to comply with limits"""
async with self.lock:
current_time = time.time()
# Clean old records (older than 60 seconds)
self._cleanup_old_records(current_time)
# Check request rate
if len(self.request_times) >= self.rpm:
wait_time = 60 - (current_time - self.request_times[0])
if wait_time > 0:
print(f"RPM limit reached ({self.rpm}), waiting {wait_time:.1f} seconds")
await asyncio.sleep(wait_time)
return await self.wait_if_needed(estimated_tokens)
# Check token rate
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tpm:
wait_time = 60 - (current_time - self.token_usage[0][0])
if wait_time > 0:
print(f"TPM limit reached ({self.tpm}), waiting {wait_time:.1f} seconds")
await asyncio.sleep(wait_time)
return await self.wait_if_needed(estimated_tokens)
# Record request
self.request_times.append(current_time)
if estimated_tokens > 0:
self.token_usage.append((current_time, estimated_tokens))
def _cleanup_old_records(self, current_time: float):
"""Clean old records"""
cutoff_time = current_time - 60
while self.request_times and self.request_times[0] < cutoff_time:
self.request_times.popleft()
while self.token_usage and self.token_usage[0][0] < cutoff_time:
self.token_usage.popleft()
class MultiKeyRateLimiter:
"""Multi-key load-balanced rate limiter"""
def __init__(self, api_keys: list, limits: Dict):
self.api_keys = api_keys
self.limiters = {
key: TokenBucketRateLimiter(
rpm=limits.get("rpm", 500),
tpm=limits.get("tpm", 30000)
)
for key in api_keys
}
self.key_usage = {key: 0 for key in api_keys}
async def get_available_key(self, estimated_tokens: int = 0) -> Optional[str]:
"""Get an available API key"""
# Sort by usage
sorted_keys = sorted(self.key_usage.items(), key=lambda x: x[1])
for key, _ in sorted_keys:
limiter = self.limiters[key]
# Check if it can be used immediately
if self._can_use_immediately(limiter, estimated_tokens):
self.key_usage[key] += 1
return key
# If none available immediately, wait on the least-used key
least_used_key = sorted_keys[0][0]
await self.limiters[least_used_key].wait_if_needed(estimated_tokens)
self.key_usage[least_used_key] += 1
return least_used_key
def _can_use_immediately(self, limiter: TokenBucketRateLimiter, tokens: int) -> bool:
"""Check if can be used immediately"""
current_time = time.time()
limiter._cleanup_old_records(current_time)
if len(limiter.request_times) >= limiter.rpm:
return False
current_tokens = sum(t for _, t in limiter.token_usage)
if current_tokens + tokens > limiter.tpm:
return False
return True
# Usage example
api_keys = ["key1", "key2", "key3"]
limiter = MultiKeyRateLimiter(
api_keys=api_keys,
limits={"rpm": 500, "tpm": 30000}
)
async def make_request(prompt: str):
estimated_tokens = len(prompt.split()) * 2 # Simple estimation
api_key = await limiter.get_available_key(estimated_tokens)
print(f"Using API key: {api_key}")
# Send request...4. Exponential backoff strategies
Smart retry mechanism
import random
import asyncio
from typing import Callable, Any
class ExponentialBackoff:
"""Exponential backoff retry strategy"""
def __init__(
self,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Calculate backoff delay"""
# Exponential growth
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
# Add jitter to avoid thundering herd
if self.jitter:
delay = delay * (0.5 + random.random() * 0.5)
return delay
async def retry_with_backoff(
func: Callable,
max_attempts: int = 5,
backoff: ExponentialBackoff = None
) -> Any:
"""Retry wrapper with exponential backoff"""
if backoff is None:
backoff = ExponentialBackoff()
last_exception = None
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
last_exception = e
# Check if it's a rate limit error
if hasattr(e, 'response') and e.response.status_code == 429:
# Prefer Retry-After header from server
retry_after = e.response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
delay = backoff.get_delay(attempt)
print(f"Attempt {attempt + 1}/{max_attempts} failed, retrying in {delay:.1f}s")
await asyncio.sleep(delay)
else:
# Non-rate-limit error; re-raise
raise e
# All retries failed
raise Exception(f"Failed after {max_attempts} attempts") from last_exception
# Usage example
@retry_with_backoff
async def call_api():
# API call logic
response = await make_api_request()
if response.status_code == 429:
raise RateLimitError(response)
return response
# Custom backoff strategy
custom_backoff = ExponentialBackoff(
base_delay=2.0, # Initial delay 2 seconds
max_delay=120.0, # Max delay 2 minutes
exponential_base=3.0, # 3x growth
jitter=True # Add jitter
)
result = await retry_with_backoff(
call_api,
max_attempts=5,
backoff=custom_backoff
)🔄 Backoff strategies comparison
Fixed delay:
Simple but may cause congestion
Linear backoff:
Gradually increases wait time
Exponential backoff:
Grows quickly; best effectiveness
5. Circuit breaker pattern
Service protection mechanism
from enum import Enum
from datetime import datetime, timedelta
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # Normal state
OPEN = "open" # Open state
HALF_OPEN = "half_open" # Half-open state
class CircuitBreaker:
"""Circuit breaker implementation"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception=Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
"""Call a function through the circuit breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open; service temporarily unavailable")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check whether to attempt a reset"""
return (
self.last_failure_time and
datetime.now() > self.last_failure_time + timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""Handle successful calls"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed calls"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} consecutive failures")
# Usage example
circuit_breaker = CircuitBreaker(
failure_threshold=3, # Open after 3 failures
recovery_timeout=30 # Try recovery after 30 seconds
)
async def protected_api_call():
try:
return await circuit_breaker.call(make_api_request)
except Exception as e:
print(f"API call failed: {e}")
# Return fallback response
return get_fallback_response()6. Monitoring and optimization
Rate limit monitoring system
import time
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitMonitor:
"""Rate limit monitor"""
def __init__(self):
self.stats = defaultdict(lambda: {
"requests": 0,
"rate_limited": 0,
"total_wait_time": 0,
"last_limit_time": None
})
def record_request(self, endpoint: str):
"""Record a request"""
self.stats[endpoint]["requests"] += 1
def record_rate_limit(self, endpoint: str, wait_time: float):
"""Record a rate limit event"""
stats = self.stats[endpoint]
stats["rate_limited"] += 1
stats["total_wait_time"] += wait_time
stats["last_limit_time"] = datetime.now()
def get_report(self) -> dict:
"""Generate monitoring report"""
report = {}
for endpoint, stats in self.stats.items():
success_rate = 1 - (stats["rate_limited"] / stats["requests"])
avg_wait = stats["total_wait_time"] / stats["rate_limited"] if stats["rate_limited"] > 0 else 0
report[endpoint] = {
"Total requests": stats["requests"],
"Rate-limited count": stats["rate_limited"],
"Success rate": f"{success_rate:.2%}",
"Avg wait time": f"{avg_wait:.2f}s",
"Last limited time": stats["last_limit_time"].strftime("%Y-%m-%d %H:%M:%S") if stats["last_limit_time"] else "N/A"
}
return report
def get_recommendations(self) -> list:
"""Get optimization recommendations"""
recommendations = []
for endpoint, stats in self.stats.items():
limit_rate = stats["rate_limited"] / stats["requests"] if stats["requests"] > 0 else 0
if limit_rate > 0.1: # Limit rate exceeds 10%
recommendations.append({
"endpoint": endpoint,
"issue": f"High rate limit ratio ({limit_rate:.1%})",
"suggestion": "Lower request frequency or use multiple API keys"
})
if stats["total_wait_time"] > 300: # Total wait exceeds 5 minutes
recommendations.append({
"endpoint": endpoint,
"issue": f"Excessive wait time ({stats['total_wait_time']:.0f}s)",
"suggestion": "Implement request queueing and batching"
})
return recommendations
# Integrate into request handling
monitor = RateLimitMonitor()
async def monitored_request(endpoint: str, **kwargs):
monitor.record_request(endpoint)
try:
response = await make_request(endpoint, **kwargs)
return response
except RateLimitError as e:
wait_time = e.retry_after
monitor.record_rate_limit(endpoint, wait_time)
await asyncio.sleep(wait_time)
return await monitored_request(endpoint, **kwargs)
# Periodic reporting
async def print_monitor_report():
while True:
await asyncio.sleep(300) # Every 5 minutes
print("
=== Rate limit monitoring report ===")
report = monitor.get_report()
for endpoint, stats in report.items():
print(f"
{endpoint}:")
for key, value in stats.items():
print(f" {key}: {value}")
recommendations = monitor.get_recommendations()
if recommendations:
print("
Optimization suggestions:")
for rec in recommendations:
print(f" - {rec['endpoint']}: {rec['suggestion']}")7. Best practices summary
🛡️ Prevention strategies
- ✅ Implement request rate control
- ✅ Use token bucket algorithms
- ✅ Batch requests
- ✅ Estimate token usage
- ✅ Load balance across multiple API keys
🚨 Mitigation measures
- ✅ Gracefully handle 429 errors
- ✅ Retry with exponential backoff
- ✅ Circuit breaker protection
- ✅ Fallback solutions
- ✅ Real-time monitoring alerts
✨ Advanced techniques
- • Use queues to smooth request spikes
- • Gradually ramp up traffic during warm-up
- • Allocate quotas by business priority
- • Periodically analyze rate limit patterns and optimize