# Rate Limiting div strong 🔨 In Development — This section is still being developed and may change. Freddy APIs enforce rate limits to ensure reliable service for all customers. The limits you experience depend on the authentication method, organization tier, and specific endpoint. ## Default limits - **API keys**: Standard limits that apply to most production workloads - **Bearer tokens**: May use a different quota profile depending on the authenticated user - **Background operations**: Often have separate concurrency controls Contact [support@aitronos.com](mailto:support@aitronos.com) if you require higher limits for your organization. ## Monitoring usage - Use [Get organization usage limits](/docs/api-reference/organizations/analytics/get-usage-limit) for monthly quota details - Track real-time consumption in [Freddy Hub](https://freddy-hub.aitronos.com) - Implement alerting when usage approaches 80% of your limit ## Handling throttling When a request exceeds the current limit, the API returns **429 Too Many Requests**. Use the `Retry-After` header to determine when you can retry. Recommended strategy: 1. Implement exponential backoff with jitter 2. Reduce request frequency when throttled responses occur 3. Cache expensive responses whenever possible 4. Batch operations to minimize repeated calls ## Related resources - [Authentication](/docs/documentation/authentication) - [Background mode](/docs/documentation/running-methods/background-mode) - [Synapses and neurons](/docs/documentation/core-concepts/synapses-and-neurons) # 📊 Rate Limiting div strong 🔨 In Development — This section is still being developed and may change. Understand and work effectively within Freddy API rate limits to build scalable applications. ## 📋 Overview Rate limiting ensures fair usage and platform stability. The Freddy API uses token bucket algorithm to limit request rates while allowing brief bursts of activity. ## 🎯 Rate Limit Tiers ### Standard Plan - **100 requests/minute** - **10,000 requests/day** - **Burst allowance:** Up to 120 requests in first minute ### Premium Plan - **1,000 requests/minute** - **100,000 requests/day** - **Burst allowance:** Up to 1,200 requests in first minute ### Enterprise Plan - **Custom limits** based on needs - **Dedicated capacity** - **Priority support** - **SLA guarantees** ## 📊 Rate Limit Headers Every API response includes rate limit information: ```http HTTP/1.1 200 OK X-RateLimit-Limit: 100 X-RateLimit-Remaining: 95 X-RateLimit-Reset: 1640995200 X-RateLimit-Retry-After: 0 ``` ### Header Descriptions | Header | Description | | --- | --- | | `X-RateLimit-Limit` | Maximum requests allowed per window | | `X-RateLimit-Remaining` | Requests remaining in current window | | `X-RateLimit-Reset` | Unix timestamp when limit resets | | `X-RateLimit-Retry-After` | Seconds to wait before retrying (when limited) | ## 🚨 Rate Limit Exceeded When you exceed limits, you'll receive: ```http HTTP/1.1 429 Too Many Requests Retry-After: 60 X-RateLimit-Limit: 100 X-RateLimit-Remaining: 0 X-RateLimit-Reset: 1640995200 ``` ```json { "detail": "Rate limit exceeded", "status_code": 429, "error_type": "RateLimitError", "retry_after": 60, "limit": 100, "reset_at": "2024-01-01T00:01:00Z" } ``` ## 🔧 Handling Rate Limits ### Basic Retry Logic ```python import time import requests def make_request_with_retry(url, headers, max_retries=3): """Make request with automatic retry on rate limit.""" for attempt in range(max_retries + 1): response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() elif response.status_code == 429: # Rate limited - check retry header retry_after = int(response.headers.get('Retry-After', 60)) if attempt < max_retries: print(f"Rate limited. Waiting {retry_after} seconds...") time.sleep(retry_after) continue else: raise Exception("Max retries exceeded") else: response.raise_for_status() raise Exception("Request failed") # Usage result = make_request_with_retry( "https://api.freddy.ai/v2/models", headers={"api-key": "ak_your_key"} ) ``` ### Rate Limit Manager ```python import time from datetime import datetime, timedelta from collections import deque class RateLimitManager: """Proactive rate limit management.""" def __init__(self, requests_per_minute=100, requests_per_day=10000): self.rpm_limit = requests_per_minute self.rpd_limit = requests_per_day # Track requests in last minute self.minute_requests = deque() # Track requests in last day self.daily_requests = deque() def can_make_request(self): """Check if we can make a request without hitting limits.""" self._cleanup_old_requests() # Check minute limit if len(self.minute_requests) >= self.rpm_limit: return False # Check daily limit if len(self.daily_requests) >= self.rpd_limit: return False return True def wait_if_needed(self): """Wait if necessary before making request.""" self._cleanup_old_requests() now = datetime.now() # Check if we need to wait for minute limit if len(self.minute_requests) >= self.rpm_limit: oldest_request = min(self.minute_requests) wait_time = 60 - (now - oldest_request).total_seconds() if wait_time > 0: print(f"Rate limit approaching. Waiting {wait_time:.1f}s...") time.sleep(wait_time) self._cleanup_old_requests() # Check if we need to wait for daily limit if len(self.daily_requests) >= self.rpd_limit: oldest_request = min(self.daily_requests) wait_time = 86400 - (now - oldest_request).total_seconds() if wait_time > 0: print(f"Daily limit reached. Waiting {wait_time/3600:.1f} hours...") time.sleep(wait_time) self._cleanup_old_requests() def record_request(self): """Record that a request was made.""" now = datetime.now() self.minute_requests.append(now) self.daily_requests.append(now) def _cleanup_old_requests(self): """Remove requests older than tracking windows.""" now = datetime.now() minute_ago = now - timedelta(minutes=1) day_ago = now - timedelta(days=1) # Clean minute requests while self.minute_requests and self.minute_requests[0] < minute_ago: self.minute_requests.popleft() # Clean daily requests while self.daily_requests and self.daily_requests[0] < day_ago: self.daily_requests.popleft() def get_stats(self): """Get current usage statistics.""" self._cleanup_old_requests() return { "requests_this_minute": len(self.minute_requests), "requests_today": len(self.daily_requests), "rpm_limit": self.rpm_limit, "rpd_limit": self.rpd_limit, "rpm_remaining": self.rpm_limit - len(self.minute_requests), "rpd_remaining": self.rpd_limit - len(self.daily_requests) } # Usage rate_limiter = RateLimitManager(requests_per_minute=100, requests_per_day=10000) def make_rate_limited_request(url, headers): # Wait if needed rate_limiter.wait_if_needed() # Make request response = requests.get(url, headers=headers) # Record request rate_limiter.record_request() return response.json() # Check usage stats stats = rate_limiter.get_stats() print(f"Requests this minute: {stats['requests_this_minute']}/{stats['rpm_limit']}") print(f"Requests today: {stats['requests_today']}/{stats['rpd_limit']}") ``` ### Adaptive Rate Limiting ```python import time from datetime import datetime class AdaptiveRateLimiter: """Adjust request rate based on server responses.""" def __init__(self, initial_rate=100): self.current_rate = initial_rate self.max_rate = initial_rate self.min_rate = 10 self.success_count = 0 self.last_request_time = None def calculate_delay(self): """Calculate delay between requests.""" return 60.0 / self.current_rate def record_success(self, response): """Record successful request and adjust rate.""" self.success_count += 1 # Get current rate limit from headers remaining = int(response.headers.get('X-RateLimit-Remaining', 0)) limit = int(response.headers.get('X-RateLimit-Limit', self.max_rate)) # Gradually increase rate if we have headroom if remaining > limit * 0.5 and self.current_rate < self.max_rate: self.current_rate = min(self.current_rate * 1.1, self.max_rate) # Slow down if getting close to limit elif remaining < limit * 0.2: self.current_rate = max(self.current_rate * 0.8, self.min_rate) def record_rate_limit(self, retry_after): """Record rate limit hit and adjust.""" # Significantly reduce rate self.current_rate = max(self.current_rate * 0.5, self.min_rate) self.success_count = 0 # Wait the specified time time.sleep(retry_after) def wait_if_needed(self): """Wait appropriate time between requests.""" if self.last_request_time: delay = self.calculate_delay() elapsed = time.time() - self.last_request_time if elapsed < delay: time.sleep(delay - elapsed) self.last_request_time = time.time() # Usage limiter = AdaptiveRateLimiter(initial_rate=100) def make_adaptive_request(url, headers): limiter.wait_if_needed() response = requests.get(url, headers=headers) if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 60)) limiter.record_rate_limit(retry_after) return make_adaptive_request(url, headers) # Retry elif response.status_code == 200: limiter.record_success(response) return response.json() else: response.raise_for_status() ``` ## 📊 Monitoring Usage ### Track Request Patterns ```python import logging from datetime import datetime class UsageMonitor: """Monitor and log API usage patterns.""" def __init__(self): self.logger = logging.getLogger(__name__) self.request_log = [] def log_request(self, endpoint, response): """Log request details.""" usage_data = { 'timestamp': datetime.now().isoformat(), 'endpoint': endpoint, 'status_code': response.status_code, 'rate_limit_remaining': response.headers.get('X-RateLimit-Remaining'), 'rate_limit_limit': response.headers.get('X-RateLimit-Limit'), } self.request_log.append(usage_data) # Alert if running low on rate limit remaining = int(response.headers.get('X-RateLimit-Remaining', 100)) limit = int(response.headers.get('X-RateLimit-Limit', 100)) if remaining < limit * 0.1: self.logger.warning(f"Low rate limit: {remaining}/{limit} remaining") def get_usage_summary(self): """Get usage summary.""" total_requests = len(self.request_log) rate_limited = sum(1 for r in self.request_log if r['status_code'] == 429) return { 'total_requests': total_requests, 'rate_limited_requests': rate_limited, 'rate_limit_percentage': (rate_limited / total_requests * 100) if total_requests > 0 else 0 } # Usage monitor = UsageMonitor() def make_monitored_request(endpoint, headers): url = f"https://api.freddy.ai/v2/{endpoint}" response = requests.get(url, headers=headers) monitor.log_request(endpoint, response) return response # Get usage summary summary = monitor.get_usage_summary() print(f"Total requests: {summary['total_requests']}") print(f"Rate limited: {summary['rate_limited_requests']} ({summary['rate_limit_percentage']:.1f}%)") ``` ## 🎯 Optimization Strategies ### 1. Request Batching Combine multiple operations when possible: ```python # ❌ Bad: Multiple separate requests for user_id in user_ids: user = get_user(user_id) # 100 requests # ✅ Good: Single batch request users = get_users_batch(user_ids) # 1 request ``` ### 2. Caching Cache responses to reduce requests: ```python import redis import json from datetime import timedelta class CachedAPIClient: def __init__(self, api_key): self.api_key = api_key self.cache = redis.Redis(host='localhost', port=6379, db=0) self.default_ttl = 300 # 5 minutes def get_models(self, use_cache=True): """Get models with caching.""" cache_key = "models:list" # Check cache first if use_cache: cached = self.cache.get(cache_key) if cached: return json.loads(cached) # Make API request response = requests.get( "https://api.freddy.ai/v2/models", headers={"api-key": self.api_key} ) data = response.json() # Cache the result self.cache.setex(cache_key, self.default_ttl, json.dumps(data)) return data ``` ### 3. Request Prioritization Prioritize critical requests: ```python from queue import PriorityQueue import threading class PrioritizedRequestQueue: def __init__(self, rate_limiter): self.queue = PriorityQueue() self.rate_limiter = rate_limiter self.running = False def add_request(self, priority, url, headers, callback): """Add request to queue with priority (lower number = higher priority).""" self.queue.put((priority, url, headers, callback)) def process_queue(self): """Process requests in priority order.""" while self.running: if not self.queue.empty(): priority, url, headers, callback = self.queue.get() self.rate_limiter.wait_if_needed() try: response = requests.get(url, headers=headers) callback(response.json()) except Exception as e: print(f"Request failed: {e}") # Usage queue = PrioritizedRequestQueue(rate_limiter) queue.running = True # Start processing thread threading.Thread(target=queue.process_queue, daemon=True).start() # Add requests with priorities queue.add_request(1, critical_url, headers, handle_critical) # High priority queue.add_request(5, normal_url, headers, handle_normal) # Normal priority queue.add_request(10, low_url, headers, handle_low) # Low priority ``` ## 📚 Best Practices ### 1. Respect Rate Limits - Always check rate limit headers - Implement automatic backoff - Never ignore 429 responses ### 2. Be Efficient - Cache when possible - Batch operations - Use webhooks instead of polling ### 3. Monitor Usage - Track request patterns - Set up alerts for high usage - Review usage analytics regularly ### 4. Plan for Scale - Design with rate limits in mind - Implement queuing for bursts - Consider upgrading plan if needed ## 📊 Upgrading Your Plan If you consistently hit rate limits: 1. **Review usage patterns** - Optimize first 2. **Consider Premium** - 10x higher limits 3. **Explore Enterprise** - Custom limits and SLA 4. **Contact sales** - Discuss specific needs ## 📚 Next Steps - **[Best Practices](/docs/documentation/best-practices)** - Recommended patterns - **[Error Handling](/docs/documentation/error-handling)** - Handle rate limit errors - **[Code Examples](/docs/documentation/examples)** - Implementation examples - **[API Reference](/docs/api-reference/introduction)** - Complete documentation *Build scalable applications within rate limits!* 📊