API Concurrent Call Best Practices

Master high-concurrency API calling techniques, improve processing efficiency, optimize costs, ensure system stability

Concurrency Control

Limit simultaneous requests

Rate Limiting

Comply with rate limits

Error Retry

Smart failure recovery

Performance Monitoring

Real-time metrics tracking

1. Python Async Concurrency

asyncio Implement

import asyncio
import aiohttp
from typing import List
import time

class ConcurrentAPIClient:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.base_url = "https://api.n1n.ai/v1"
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def make_request(self, prompt: str):
        async with self.semaphore:  # Limit concurrent requests
            async with aiohttp.ClientSession() as session:
                headers = {
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
                
                payload = {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": prompt}],
                    "max_tokens": 100
                }
                
                async with session.post(
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=payload
                ) as response:
                    if response.status == 200:
                        result = await response.json()
                        return result['choices'][0]['message']['content']
                    else:
                        raise Exception(f"API error: {response.status}")
    
    async def batch_process(self, prompts: List[str]):
        tasks = [self.make_request(p) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)

# usingExample
async def main():
    client = ConcurrentAPIClient("your-api-key", max_concurrent=10)
    prompts = [f"Translate: Hello {i}" for i in range(50)]
    
    start = time.time()
    results = await client.batch_process(prompts)
    print(f"Processing {len(prompts)} requests took: {time.time() - start:.2f} seconds")

asyncio.run(main())

Core Features

  • • Semaphore concurrency control
  • • Async non-blocking
  • • Batch processing
  • • Error isolation

Performance Improvement

  • • 10x speed improvement
  • • High CPU utilization
  • • Low memory usage
  • • Short response time

Use Cases

  • • Batch translation
  • • Data Processing
  • • Content generation
  • • API Test

2. Node.js Concurrency Control

Promise Concurrency

const axios = require('axios');
const pLimit = require('p-limit');

class ConcurrentAPIClient {
    constructor(apiKey, maxConcurrent = 10) {
        this.apiKey = apiKey;
        this.baseURL = 'https://api.n1n.ai/v1';
        this.limit = pLimit(maxConcurrent);
    }

    async makeRequest(prompt) {
        return this.limit(async () => {
            const response = await axios.post(
                `${this.baseURL}/chat/completions`,
                {
                    model: 'gpt-4o-mini',
                    messages: [{ role: 'user', content: prompt }],
                    max_tokens: 100
                },
                {
                    headers: {
                        'Authorization': `Bearer ${this.apiKey}`,
                        'Content-Type': 'application/json'
                    }
                }
            );
            return response.data.choices[0].message.content;
        });
    }

    async batchProcess(prompts) {
        const start = Date.now();
        const promises = prompts.map(p => this.makeRequest(p));
        const results = await Promise.all(promises);
        
        console.log(`Processing ${prompts.length} requests took: ${(Date.now() - start) / 1000} seconds`);
        return results;
    }
}

// usingExample
async function main() {
    const client = new ConcurrentAPIClient('your-api-key', 10);
    const prompts = Array.from({ length: 50 }, (_, i) => `Translate: Hello ${i}`);
    await client.batchProcess(prompts);
}

💡 Best Practices

  • • Use p-limit to control concurrency
  • • Promise.all for batch processing
  • • Set reasonable timeout
  • • Implement error retry mechanism

3. Smart Rate Limiting

Rate Limiter Implementation

class RateLimiter:
    """Smart rate limiter"""
    def __init__(self):
        self.limits = {
            "gpt-4o": {"rpm": 500, "tpm": 30000},
            "gpt-4o-mini": {"rpm": 5000, "tpm": 200000}
        }
        self.request_times = []
        self.token_counts = []
    
    async def wait_if_needed(self, model: str, tokens: int):
        """Wait if necessary to comply with rate limits"""
        current_time = time.time()
        
        # Clean up records older than 60 seconds
        self.request_times = [t for t in self.request_times if t > current_time - 60]
        self.token_counts = [(t, c) for t, c in self.token_counts if t > current_time - 60]
        
        # Check RPM
        if len(self.request_times) >= self.limits[model]["rpm"]:
            wait_time = 60 - (current_time - self.request_times[0])
            if wait_time > 0:
                await asyncio.sleep(wait_time)
        
        # Check TPM
        total_tokens = sum(c for _, c in self.token_counts) + tokens
        if total_tokens > self.limits[model]["tpm"]:
            await asyncio.sleep(5)  # Wait for token window to slide
        
        # Record request
        self.request_times.append(current_time)
        self.token_counts.append((current_time, tokens))

API Limits

ModelRPMTPM
GPT-4o50030K
GPT-4o mini5000200K

Rate Limiting Strategies

  • • Sliding window algorithm
  • • Token estimation
  • • Adaptive backoff
  • • Priority queue

4. Error Handling and Retry

Exponential Backoff Retry

import backoff

class RobustAPIClient:
    @backoff.on_exception(
        backoff.expo,
        (aiohttp.ClientError, asyncio.TimeoutError),
        max_tries=3,
        max_time=30
    )
    async def make_request_with_retry(self, prompt: str):
        """Request with exponential backoff retry"""
        try:
            response = await self._make_request(prompt)
            return {"success": True, "data": response}
        except aiohttp.ClientResponseError as e:
            if e.status == 429:  # Rate limit
                retry_after = int(e.headers.get('Retry-After', 5))
                await asyncio.sleep(retry_after)
                raise  # Trigger retry
            elif e.status >= 500:  # Server error
                raise  # Trigger retry
            else:
                return {"success": False, "error": str(e)}

5. Performance Monitoring

Monitoring Metrics

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            "total_requests": 0,
            "successful": 0,
            "failed": 0,
            "total_tokens": 0,
            "response_times": []
        }
    
    def record_success(self, response_time: float, tokens: int):
        self.metrics["total_requests"] += 1
        self.metrics["successful"] += 1
        self.metrics["total_tokens"] += tokens
        self.metrics["response_times"].append(response_time)
    
    def get_stats(self):
        avg_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
        return {
            "Success Rate": f"{self.metrics['successful'] / self.metrics['total_requests'] * 100:.1f}%",
            "Avg Response Time": f"{avg_time:.2f} seconds",
            "Throughput": f"{self.metrics['total_requests'] / sum(self.metrics['response_times']):.1f} req/s"
        }

📊 Key Metrics

  • Success Rate: Monitor API call success rate
  • Response Time: P50, P95, P99 percentiles
  • Throughput: Requests processed per second
  • Error Distribution: Error type statistics

6. Best Practices Summary

🚀 Performance Optimization

  • ✅ Set reasonable concurrency (10-50)
  • ✅ Use connection pool reuse
  • ✅ Batch process similar requests
  • ✅ Implement request deduplication
  • ✅ Priority queue management

🛡️ Stability Assurance

  • ✅ Exponential backoff retry
  • ✅ Reasonable timeout settings
  • ✅ Error isolation mechanism
  • ✅ Degradation strategy
  • ✅ Monitoring alerts

Concurrency Recommendations

  • Development Test: 5-10 concurrent
  • Production Environment: 20-50 concurrent
  • High Throughput: 100+ concurrent (requires multiple keys)