API Concurrent Call Best Practices
Master high-concurrency API calling techniques, improve processing efficiency, optimize costs, ensure system stability
Concurrency Control
Limit simultaneous requests
Rate Limiting
Comply with rate limits
Error Retry
Smart failure recovery
Performance Monitoring
Real-time metrics tracking
1. Python Async Concurrency
asyncio Implement
import asyncio
import aiohttp
from typing import List
import time
class ConcurrentAPIClient:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = "https://api.n1n.ai/v1"
self.semaphore = asyncio.Semaphore(max_concurrent)
async def make_request(self, prompt: str):
async with self.semaphore: # Limit concurrent requests
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 100
}
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status == 200:
result = await response.json()
return result['choices'][0]['message']['content']
else:
raise Exception(f"API error: {response.status}")
async def batch_process(self, prompts: List[str]):
tasks = [self.make_request(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# usingExample
async def main():
client = ConcurrentAPIClient("your-api-key", max_concurrent=10)
prompts = [f"Translate: Hello {i}" for i in range(50)]
start = time.time()
results = await client.batch_process(prompts)
print(f"Processing {len(prompts)} requests took: {time.time() - start:.2f} seconds")
asyncio.run(main())Core Features
- • Semaphore concurrency control
- • Async non-blocking
- • Batch processing
- • Error isolation
Performance Improvement
- • 10x speed improvement
- • High CPU utilization
- • Low memory usage
- • Short response time
Use Cases
- • Batch translation
- • Data Processing
- • Content generation
- • API Test
2. Node.js Concurrency Control
Promise Concurrency
const axios = require('axios');
const pLimit = require('p-limit');
class ConcurrentAPIClient {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.baseURL = 'https://api.n1n.ai/v1';
this.limit = pLimit(maxConcurrent);
}
async makeRequest(prompt) {
return this.limit(async () => {
const response = await axios.post(
`${this.baseURL}/chat/completions`,
{
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: prompt }],
max_tokens: 100
},
{
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
}
}
);
return response.data.choices[0].message.content;
});
}
async batchProcess(prompts) {
const start = Date.now();
const promises = prompts.map(p => this.makeRequest(p));
const results = await Promise.all(promises);
console.log(`Processing ${prompts.length} requests took: ${(Date.now() - start) / 1000} seconds`);
return results;
}
}
// usingExample
async function main() {
const client = new ConcurrentAPIClient('your-api-key', 10);
const prompts = Array.from({ length: 50 }, (_, i) => `Translate: Hello ${i}`);
await client.batchProcess(prompts);
}💡 Best Practices
- • Use p-limit to control concurrency
- • Promise.all for batch processing
- • Set reasonable timeout
- • Implement error retry mechanism
3. Smart Rate Limiting
Rate Limiter Implementation
class RateLimiter:
"""Smart rate limiter"""
def __init__(self):
self.limits = {
"gpt-4o": {"rpm": 500, "tpm": 30000},
"gpt-4o-mini": {"rpm": 5000, "tpm": 200000}
}
self.request_times = []
self.token_counts = []
async def wait_if_needed(self, model: str, tokens: int):
"""Wait if necessary to comply with rate limits"""
current_time = time.time()
# Clean up records older than 60 seconds
self.request_times = [t for t in self.request_times if t > current_time - 60]
self.token_counts = [(t, c) for t, c in self.token_counts if t > current_time - 60]
# Check RPM
if len(self.request_times) >= self.limits[model]["rpm"]:
wait_time = 60 - (current_time - self.request_times[0])
if wait_time > 0:
await asyncio.sleep(wait_time)
# Check TPM
total_tokens = sum(c for _, c in self.token_counts) + tokens
if total_tokens > self.limits[model]["tpm"]:
await asyncio.sleep(5) # Wait for token window to slide
# Record request
self.request_times.append(current_time)
self.token_counts.append((current_time, tokens))API Limits
| Model | RPM | TPM |
|---|---|---|
| GPT-4o | 500 | 30K |
| GPT-4o mini | 5000 | 200K |
Rate Limiting Strategies
- • Sliding window algorithm
- • Token estimation
- • Adaptive backoff
- • Priority queue
4. Error Handling and Retry
Exponential Backoff Retry
import backoff
class RobustAPIClient:
@backoff.on_exception(
backoff.expo,
(aiohttp.ClientError, asyncio.TimeoutError),
max_tries=3,
max_time=30
)
async def make_request_with_retry(self, prompt: str):
"""Request with exponential backoff retry"""
try:
response = await self._make_request(prompt)
return {"success": True, "data": response}
except aiohttp.ClientResponseError as e:
if e.status == 429: # Rate limit
retry_after = int(e.headers.get('Retry-After', 5))
await asyncio.sleep(retry_after)
raise # Trigger retry
elif e.status >= 500: # Server error
raise # Trigger retry
else:
return {"success": False, "error": str(e)}5. Performance Monitoring
Monitoring Metrics
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"successful": 0,
"failed": 0,
"total_tokens": 0,
"response_times": []
}
def record_success(self, response_time: float, tokens: int):
self.metrics["total_requests"] += 1
self.metrics["successful"] += 1
self.metrics["total_tokens"] += tokens
self.metrics["response_times"].append(response_time)
def get_stats(self):
avg_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
return {
"Success Rate": f"{self.metrics['successful'] / self.metrics['total_requests'] * 100:.1f}%",
"Avg Response Time": f"{avg_time:.2f} seconds",
"Throughput": f"{self.metrics['total_requests'] / sum(self.metrics['response_times']):.1f} req/s"
}📊 Key Metrics
- • Success Rate: Monitor API call success rate
- • Response Time: P50, P95, P99 percentiles
- • Throughput: Requests processed per second
- • Error Distribution: Error type statistics
6. Best Practices Summary
🚀 Performance Optimization
- ✅ Set reasonable concurrency (10-50)
- ✅ Use connection pool reuse
- ✅ Batch process similar requests
- ✅ Implement request deduplication
- ✅ Priority queue management
🛡️ Stability Assurance
- ✅ Exponential backoff retry
- ✅ Reasonable timeout settings
- ✅ Error isolation mechanism
- ✅ Degradation strategy
- ✅ Monitoring alerts
Concurrency Recommendations
- • Development Test: 5-10 concurrent
- • Production Environment: 20-50 concurrent
- • High Throughput: 100+ concurrent (requires multiple keys)