Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion segment/analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DefaultConfig(object):
max_queue_size = 10000
gzip = False
timeout = 15
max_retries = 10
max_retries = 1000
proxies = None
thread = 1
upload_interval = 0.5
Expand Down
142 changes: 105 additions & 37 deletions segment/analytics/consumer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
import time
import random
from threading import Thread
import backoff
import json

from segment.analytics.request import post, APIError, DatetimeSerializer
from segment.analytics.request import post, APIError, DatetimeSerializer, parse_retry_after

from queue import Empty

Expand All @@ -29,7 +29,7 @@ class Consumer(Thread):
log = logging.getLogger('segment')

def __init__(self, queue, write_key, upload_size=100, host=None,
on_error=None, upload_interval=0.5, gzip=False, retries=10,
on_error=None, upload_interval=0.5, gzip=False, retries=1000,
timeout=15, proxies=None, oauth_manager=None):
"""Create a consumer thread."""
Thread.__init__(self)
Expand Down Expand Up @@ -120,40 +120,108 @@ def next(self):
return items

def request(self, batch):
"""Attempt to upload the batch and retry before raising an error """

def fatal_exception(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
elif isinstance(exc, FatalError):
return True
else:
# retry on all other errors (eg. network)
return False

attempt_count = 0

@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
on_backoff=lambda details: self.log.debug(
f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s"
))
def send_request():
nonlocal attempt_count
attempt_count += 1
"""Attempt to upload the batch and retry before raising an error"""

def is_retryable_status(status):
"""
Determine if a status code is retryable.
Retryable 4xx: 408, 410, 429, 460
Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx
Retryable 5xx: All except 501, 505
Non-retryable 5xx: 501, 505
"""
if 400 <= status < 500:
return status in (408, 410, 429, 460)
elif 500 <= status < 600:
return status not in (501, 505)
return False

def should_use_retry_after(status):
"""Check if status code should respect Retry-After header"""
return status in (408, 429, 503)

total_attempts = 0
backoff_attempts = 0
max_backoff_attempts = self.retries + 1

while True:
try:
return post(self.write_key, self.host, gzip=self.gzip,
timeout=self.timeout, batch=batch, proxies=self.proxies,
oauth_manager=self.oauth_manager)
except Exception as e:
if attempt_count >= self.retries + 1:
self.log.error(f"All {self.retries} retries exhausted. Final error: {e}")
# Make the request with current retry count
response = post(
self.write_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
proxies=self.proxies,
oauth_manager=self.oauth_manager,
retry_count=total_attempts
)
# Success
return response

except FatalError as e:
# Non-retryable error
self.log.error(f"Fatal error after {total_attempts} attempts: {e}")
raise

send_request()
except APIError as e:
total_attempts += 1

# Check if we should use Retry-After header
if should_use_retry_after(e.status) and e.response:
retry_after = parse_retry_after(e.response)
if retry_after:
self.log.debug(
f"Retry-After header present: waiting {retry_after}s (attempt {total_attempts})"
)
time.sleep(retry_after)
continue # Does not count against backoff budget

# Check if status is retryable
if not is_retryable_status(e.status):
self.log.error(
f"Non-retryable error {e.status} after {total_attempts} attempts: {e}"
)
raise

# Count this against backoff attempts
backoff_attempts += 1
if backoff_attempts >= max_backoff_attempts:
self.log.error(
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
)
raise

# Calculate exponential backoff delay with jitter
base_delay = 0.5 * (2 ** (backoff_attempts - 1))
jitter = random.uniform(0, 0.1 * base_delay)
delay = min(base_delay + jitter, 60) # Cap at 60 seconds

self.log.debug(
f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
f"after {delay:.2f}s for status {e.status}"
)
time.sleep(delay)

except Exception as e:
# Network errors or other exceptions - retry with backoff
total_attempts += 1
backoff_attempts += 1

if backoff_attempts >= max_backoff_attempts:
self.log.error(
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
)
raise

# Calculate exponential backoff delay with jitter
base_delay = 0.5 * (2 ** (backoff_attempts - 1))
jitter = random.uniform(0, 0.1 * base_delay)
delay = min(base_delay + jitter, 60) # Cap at 60 seconds

self.log.debug(
f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
f"after {delay:.2f}s: {e}"
)
time.sleep(delay)
45 changes: 39 additions & 6 deletions segment/analytics/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from gzip import GzipFile
import logging
import json
import base64
from dateutil.tz import tzutc
from requests.auth import HTTPBasicAuth
from requests import sessions
Expand All @@ -12,8 +13,31 @@

_session = sessions.Session()

# Maximum Retry-After delay to respect (5 minutes)
MAX_RETRY_AFTER_SECONDS = 300

def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, **kwargs):

def parse_retry_after(response):
"""
Parse Retry-After header from response.
Returns the delay in seconds, or None if header is not present or invalid.
Caps the value at MAX_RETRY_AFTER_SECONDS.
"""
retry_after = response.headers.get('Retry-After')
if not retry_after:
return None

try:
# Try parsing as integer (delay in seconds)
delay = int(retry_after)
return min(delay, MAX_RETRY_AFTER_SECONDS)
except ValueError:
# Could be HTTP-date format, but for simplicity we'll skip that
# Most APIs use integer seconds
return None


def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, retry_count=0, **kwargs):
"""Post the `kwargs` to the API"""
log = logging.getLogger('segment')
body = kwargs
Expand All @@ -28,10 +52,18 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag
log.debug('making request: %s', data)
headers = {
'Content-Type': 'application/json',
'User-Agent': 'analytics-python/' + VERSION
'User-Agent': 'analytics-python/' + VERSION,
'X-Retry-Count': str(retry_count)
}

# Add Authorization header - prefer OAuth Bearer token, fallback to Basic auth
if auth:
headers['Authorization'] = 'Bearer {}'.format(auth)
else:
# Basic auth with write key (format: "writeKey:" encoded in base64)
credentials = '{}:'.format(write_key)
encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
headers['Authorization'] = 'Basic {}'.format(encoded)

if gzip:
headers['Content-Encoding'] = 'gzip'
Expand Down Expand Up @@ -60,24 +92,25 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag
log.debug('data uploaded successfully')
return res

if oauth_manager and res.status_code in [400, 401, 403]:
if oauth_manager and res.status_code in [400, 401, 403, 511]:
oauth_manager.clear_token()

try:
payload = res.json()
log.debug('received response: %s', payload)
raise APIError(res.status_code, payload['code'], payload['message'])
raise APIError(res.status_code, payload['code'], payload['message'], res)
except ValueError:
log.error('Unknown error: [%s] %s', res.status_code, res.reason)
raise APIError(res.status_code, 'unknown', res.text)
raise APIError(res.status_code, 'unknown', res.text, res)


class APIError(Exception):

def __init__(self, status, code, message):
def __init__(self, status, code, message, response=None):
self.message = message
self.status = status
self.code = code
self.response = response

def __str__(self):
msg = "[Segment] {0}: {1} ({2})"
Expand Down
Loading