Comprehensive Rate Limit Handling Solution

Deeply understand API rate limiting, master mitigation strategies, and ensure stable, reliable services.

Rate limiting protection

Prevent service overload

Smart retries

Automatically wait for recovery

Rate control

Request frequency management

Load balancing

Distribute across multiple keys

1. API rate limiting rules

Types of limits

RPM (Requests Per Minute)

Requests per minute limit to prevent excessive frequency

TPM (Tokens Per Minute)

Tokens per minute limit to control processing volume

Model limit comparison

ModelRPMTPMTier
GPT-4o50030,000Tier 1
GPT-4o5,000450,000Tier 4
GPT-4o mini5,000200,000Tier 1
GPT-4o mini30,000150,000,000Tier 5
Claude 3.5 Sonnet1,000100,000Standard
Claude 3.5 Haiku5,000500,000Standard

2. Basic rate-limit handling

Error handling and retries

import time
import requests
from typing import Optional, Dict

class RateLimitHandler:
    """Smart rate limit handler"""
    
    def __init__(self):
        self.retry_after_default = 5  # Default retry wait time (seconds)
        
    def make_request_with_retry(
        self,
        url: str,
        headers: Dict,
        data: Dict,
        max_retries: int = 3
    ) -> Optional[Dict]:
        """Make a request with rate limit handling"""
        
        for attempt in range(max_retries):
            try:
                response = requests.post(url, headers=headers, json=data)
                
                # Success
                if response.status_code == 200:
                    return response.json()
                
                # Rate limit error
                elif response.status_code == 429:
                    retry_after = self._get_retry_after(response)
                    print(f"Rate limit hit, waiting {retry_after} seconds...")
                    
                    # Show remaining limit info
                    self._show_rate_limit_info(response.headers)
                    
                    time.sleep(retry_after)
                    continue
                
                # Other errors
                else:
                    print(f"Request failed: {response.status_code}")
                    return None
                    
            except Exception as e:
                print(f"Request exception: {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                    
        return None
    
    def _get_retry_after(self, response) -> int:
        """Get retry time from response headers"""
        retry_after = response.headers.get('Retry-After')
        if retry_after:
            # Could be seconds or a timestamp
            try:
                return int(retry_after)
            except ValueError:
                # If it's a date, calculate wait seconds
                from datetime import datetime
                retry_time = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
                wait_seconds = (retry_time - datetime.utcnow()).total_seconds()
                return max(0, int(wait_seconds))
        
        return self.retry_after_default
    
    def _show_rate_limit_info(self, headers):
        """Display rate limit info"""
        info = {
            "Limit": headers.get('X-RateLimit-Limit'),
            "Remaining": headers.get('X-RateLimit-Remaining'),
            "Reset": headers.get('X-RateLimit-Reset'),
            "Retry-After": headers.get('Retry-After')
        }
        
        for key, value in info.items():
            if value:
                print(f"  {key}: {value}")

# Usage example
handler = RateLimitHandler()
response = handler.make_request_with_retry(
    url="https://api.n1n.ai/v1/chat/completions",
    headers={"Authorization": "Bearer your-key"},
    data={
        "model": "gpt-4o",
        "messages": [{"role": "user", "content": "Hello"}]
    }
)

💡 Response headers

  • X-RateLimit-Limit: Total allowed
  • X-RateLimit-Remaining: Remaining quota
  • X-RateLimit-Reset: Reset timestamp
  • Retry-After: Suggested wait seconds

3. Advanced rate-limiting strategies

Token bucket algorithm

import asyncio
from collections import deque
from typing import Dict, Optional
import time

class TokenBucketRateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, rpm: int, tpm: int):
        self.rpm = rpm  # Requests per minute
        self.tpm = tpm  # Tokens per minute
        self.request_times = deque()
        self.token_usage = deque()
        self.lock = asyncio.Lock()
    
    async def wait_if_needed(self, estimated_tokens: int = 0):
        """Wait if needed to comply with limits"""
        async with self.lock:
            current_time = time.time()
            
            # Clean old records (older than 60 seconds)
            self._cleanup_old_records(current_time)
            
            # Check request rate
            if len(self.request_times) >= self.rpm:
                wait_time = 60 - (current_time - self.request_times[0])
                if wait_time > 0:
                    print(f"RPM limit reached ({self.rpm}), waiting {wait_time:.1f} seconds")
                    await asyncio.sleep(wait_time)
                    return await self.wait_if_needed(estimated_tokens)
            
            # Check token rate
            current_tokens = sum(tokens for _, tokens in self.token_usage)
            if current_tokens + estimated_tokens > self.tpm:
                wait_time = 60 - (current_time - self.token_usage[0][0])
                if wait_time > 0:
                    print(f"TPM limit reached ({self.tpm}), waiting {wait_time:.1f} seconds")
                    await asyncio.sleep(wait_time)
                    return await self.wait_if_needed(estimated_tokens)
            
            # Record request
            self.request_times.append(current_time)
            if estimated_tokens > 0:
                self.token_usage.append((current_time, estimated_tokens))
    
    def _cleanup_old_records(self, current_time: float):
        """Clean old records"""
        cutoff_time = current_time - 60
        
        while self.request_times and self.request_times[0] < cutoff_time:
            self.request_times.popleft()
        
        while self.token_usage and self.token_usage[0][0] < cutoff_time:
            self.token_usage.popleft()

class MultiKeyRateLimiter:
    """Multi-key load-balanced rate limiter"""
    
    def __init__(self, api_keys: list, limits: Dict):
        self.api_keys = api_keys
        self.limiters = {
            key: TokenBucketRateLimiter(
                rpm=limits.get("rpm", 500),
                tpm=limits.get("tpm", 30000)
            )
            for key in api_keys
        }
        self.key_usage = {key: 0 for key in api_keys}
    
    async def get_available_key(self, estimated_tokens: int = 0) -> Optional[str]:
        """Get an available API key"""
        # Sort by usage
        sorted_keys = sorted(self.key_usage.items(), key=lambda x: x[1])
        
        for key, _ in sorted_keys:
            limiter = self.limiters[key]
            
            # Check if it can be used immediately
            if self._can_use_immediately(limiter, estimated_tokens):
                self.key_usage[key] += 1
                return key
        
        # If none available immediately, wait on the least-used key
        least_used_key = sorted_keys[0][0]
        await self.limiters[least_used_key].wait_if_needed(estimated_tokens)
        self.key_usage[least_used_key] += 1
        return least_used_key
    
    def _can_use_immediately(self, limiter: TokenBucketRateLimiter, tokens: int) -> bool:
        """Check if can be used immediately"""
        current_time = time.time()
        limiter._cleanup_old_records(current_time)
        
        if len(limiter.request_times) >= limiter.rpm:
            return False
        
        current_tokens = sum(t for _, t in limiter.token_usage)
        if current_tokens + tokens > limiter.tpm:
            return False
        
        return True

# Usage example
api_keys = ["key1", "key2", "key3"]
limiter = MultiKeyRateLimiter(
    api_keys=api_keys,
    limits={"rpm": 500, "tpm": 30000}
)

async def make_request(prompt: str):
    estimated_tokens = len(prompt.split()) * 2  # Simple estimation
    api_key = await limiter.get_available_key(estimated_tokens)
    print(f"Using API key: {api_key}")
    # Send request...

4. Exponential backoff strategies

Smart retry mechanism

import random
import asyncio
from typing import Callable, Any

class ExponentialBackoff:
    """Exponential backoff retry strategy"""
    
    def __init__(
        self,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """Calculate backoff delay"""
        # Exponential growth
        delay = min(
            self.base_delay * (self.exponential_base ** attempt),
            self.max_delay
        )
        
        # Add jitter to avoid thundering herd
        if self.jitter:
            delay = delay * (0.5 + random.random() * 0.5)
        
        return delay

async def retry_with_backoff(
    func: Callable,
    max_attempts: int = 5,
    backoff: ExponentialBackoff = None
) -> Any:
    """Retry wrapper with exponential backoff"""
    if backoff is None:
        backoff = ExponentialBackoff()
    
    last_exception = None
    
    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            
            # Check if it's a rate limit error
            if hasattr(e, 'response') and e.response.status_code == 429:
                # Prefer Retry-After header from server
                retry_after = e.response.headers.get('Retry-After')
                if retry_after:
                    delay = int(retry_after)
                else:
                    delay = backoff.get_delay(attempt)
                
                print(f"Attempt {attempt + 1}/{max_attempts} failed, retrying in {delay:.1f}s")
                await asyncio.sleep(delay)
            else:
                # Non-rate-limit error; re-raise
                raise e
    
    # All retries failed
    raise Exception(f"Failed after {max_attempts} attempts") from last_exception

# Usage example
@retry_with_backoff
async def call_api():
    # API call logic
    response = await make_api_request()
    if response.status_code == 429:
        raise RateLimitError(response)
    return response

# Custom backoff strategy
custom_backoff = ExponentialBackoff(
    base_delay=2.0,      # Initial delay 2 seconds
    max_delay=120.0,     # Max delay 2 minutes
    exponential_base=3.0,  # 3x growth
    jitter=True          # Add jitter
)

result = await retry_with_backoff(
    call_api,
    max_attempts=5,
    backoff=custom_backoff
)

🔄 Backoff strategies comparison

Fixed delay:

Simple but may cause congestion

Linear backoff:

Gradually increases wait time

Exponential backoff:

Grows quickly; best effectiveness

5. Circuit breaker pattern

Service protection mechanism

from enum import Enum
from datetime import datetime, timedelta
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"    # Normal state
    OPEN = "open"        # Open state
    HALF_OPEN = "half_open"  # Half-open state

class CircuitBreaker:
    """Circuit breaker implementation"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception=Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func, *args, **kwargs):
        """Call a function through the circuit breaker"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is open; service temporarily unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """Check whether to attempt a reset"""
        return (
            self.last_failure_time and
            datetime.now() > self.last_failure_time + timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        """Handle successful calls"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed calls"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} consecutive failures")

# Usage example
circuit_breaker = CircuitBreaker(
    failure_threshold=3,  # Open after 3 failures
    recovery_timeout=30   # Try recovery after 30 seconds
)

async def protected_api_call():
    try:
        return await circuit_breaker.call(make_api_request)
    except Exception as e:
        print(f"API call failed: {e}")
        # Return fallback response
        return get_fallback_response()

6. Monitoring and optimization

Rate limit monitoring system

import time
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimitMonitor:
    """Rate limit monitor"""
    
    def __init__(self):
        self.stats = defaultdict(lambda: {
            "requests": 0,
            "rate_limited": 0,
            "total_wait_time": 0,
            "last_limit_time": None
        })
    
    def record_request(self, endpoint: str):
        """Record a request"""
        self.stats[endpoint]["requests"] += 1
    
    def record_rate_limit(self, endpoint: str, wait_time: float):
        """Record a rate limit event"""
        stats = self.stats[endpoint]
        stats["rate_limited"] += 1
        stats["total_wait_time"] += wait_time
        stats["last_limit_time"] = datetime.now()
    
    def get_report(self) -> dict:
        """Generate monitoring report"""
        report = {}
        
        for endpoint, stats in self.stats.items():
            success_rate = 1 - (stats["rate_limited"] / stats["requests"])
            avg_wait = stats["total_wait_time"] / stats["rate_limited"] if stats["rate_limited"] > 0 else 0
            
            report[endpoint] = {
                "Total requests": stats["requests"],
                "Rate-limited count": stats["rate_limited"],
                "Success rate": f"{success_rate:.2%}",
                "Avg wait time": f"{avg_wait:.2f}s",
                "Last limited time": stats["last_limit_time"].strftime("%Y-%m-%d %H:%M:%S") if stats["last_limit_time"] else "N/A"
            }
        
        return report
    
    def get_recommendations(self) -> list:
        """Get optimization recommendations"""
        recommendations = []
        
        for endpoint, stats in self.stats.items():
            limit_rate = stats["rate_limited"] / stats["requests"] if stats["requests"] > 0 else 0
            
            if limit_rate > 0.1:  # Limit rate exceeds 10%
                recommendations.append({
                    "endpoint": endpoint,
                    "issue": f"High rate limit ratio ({limit_rate:.1%})",
                    "suggestion": "Lower request frequency or use multiple API keys"
                })
            
            if stats["total_wait_time"] > 300:  # Total wait exceeds 5 minutes
                recommendations.append({
                    "endpoint": endpoint,
                    "issue": f"Excessive wait time ({stats['total_wait_time']:.0f}s)",
                    "suggestion": "Implement request queueing and batching"
                })
        
        return recommendations

# Integrate into request handling
monitor = RateLimitMonitor()

async def monitored_request(endpoint: str, **kwargs):
    monitor.record_request(endpoint)
    
    try:
        response = await make_request(endpoint, **kwargs)
        return response
    except RateLimitError as e:
        wait_time = e.retry_after
        monitor.record_rate_limit(endpoint, wait_time)
        await asyncio.sleep(wait_time)
        return await monitored_request(endpoint, **kwargs)

# Periodic reporting
async def print_monitor_report():
    while True:
        await asyncio.sleep(300)  # Every 5 minutes
        print("
=== Rate limit monitoring report ===")
        report = monitor.get_report()
        for endpoint, stats in report.items():
            print(f"
{endpoint}:")
            for key, value in stats.items():
                print(f"  {key}: {value}")
        
        recommendations = monitor.get_recommendations()
        if recommendations:
            print("
Optimization suggestions:")
            for rec in recommendations:
                print(f"  - {rec['endpoint']}: {rec['suggestion']}")

7. Best practices summary

🛡️ Prevention strategies

  • ✅ Implement request rate control
  • ✅ Use token bucket algorithms
  • ✅ Batch requests
  • ✅ Estimate token usage
  • ✅ Load balance across multiple API keys

🚨 Mitigation measures

  • ✅ Gracefully handle 429 errors
  • ✅ Retry with exponential backoff
  • ✅ Circuit breaker protection
  • ✅ Fallback solutions
  • ✅ Real-time monitoring alerts

✨ Advanced techniques

  • • Use queues to smooth request spikes
  • • Gradually ramp up traffic during warm-up
  • • Allocate quotas by business priority
  • • Periodically analyze rate limit patterns and optimize