Table of Contents
- Why Asyncio?
- Real-World Use Cases
- Async vs Parallel
- Semaphores for Rate Limiting
- ThreadPoolExecutor vs Asyncio
- Production Patterns
- Anti-Ban Scraping Strategies
Why Asyncio?
Primary Use Case: I/O-Bound Operations
Perfect for network requests, database queries, file I/O where the CPU waits for external resources.
Example:
@app.post("/process")
async def process_items(items: list[str]):
# Fetch data for all items (N API calls concurrently)
async with requests.AsyncSession(impersonate="chrome110") as session:
tasks = [fetch_item(session, item_id) for item_id in items]
return await asyncio.gather(*tasks)
Performance:
- Without asyncio: 200ms × 3 items = 600ms
- With asyncio: 200ms total (all concurrent)
Benefits:
- Single-threaded, no race conditions
- Minimal memory overhead (~1-2KB per task)
- Can handle 10,000+ concurrent connections
- Explicit
awaitpoints show where context switches occur
Real-World Use Cases
1. Web Scraping (High Concurrency)
Scraping thousands of pages with rate limiting and anti-detection measures.
Use case: Price monitoring, data aggregation, market research
Scale: 1,000-100,000 URLs
Pattern: Semaphore + random delays + session rotation
2. API Gateway Pattern
Fan-out requests to multiple microservices and aggregate results.
Use case: User dashboard aggregating auth, profile, orders, analytics
Scale: 5-20 concurrent service calls per request
Pattern: asyncio.gather() with timeout handling
3. Database Query Optimization
Parallel database queries when data isn't interdependent.
Use case: Dashboard loading multiple metrics simultaneously
Scale: 10-50 concurrent queries
Pattern: asyncio.gather() with connection pooling
4. File Operations at Scale
Concurrent file uploads/downloads/processing.
Use case: Batch image processing, S3 operations, log aggregation
Scale: 100-10,000 files
Pattern: Semaphore-limited async file I/O
5. Real-Time Data Streaming
WebSocket connections for live updates.
Use case: Chat systems, live dashboards, notification services
Scale: 1,000-100,000 concurrent connections
Pattern: Async WebSocket handlers with message queues
6. External API Aggregation
Calling multiple external APIs and combining results.
Use case: Flight booking (compare prices across airlines), weather aggregation
Scale: 3-50 concurrent API calls
Pattern: asyncio.gather() with exponential backoff
Async vs Parallel: The Critical Difference
Async = Concurrency (Interleaved)
Definition: Multiple tasks make progress by switching between them during idle time.
Execution: Single-threaded, one task runs at a time.
Use for: I/O-bound tasks (network, disk, database)
async def main():
# These don't run in parallel—they're interleaved
# When task1 waits for I/O, task2 runs
results = await asyncio.gather(
fetch_url("https://api1.com"), # Waits for network
fetch_url("https://api2.com"), # Runs while api1 waits
fetch_url("https://api3.com"), # Runs while api1/2 wait
)
Timeline:
Thread 1: [task1-send]---[wait]------[task1-receive]
[task2-send]---[wait]------[task2-receive]
[task3-send]---[wait]------[task3-receive]
Parallel = True Parallelism (Simultaneous)
Definition: Multiple tasks run simultaneously on different CPU cores.
Execution: Multi-threaded/multi-process.
Use for: CPU-bound tasks (calculations, data processing)
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
return sum(i**2 for i in range(n))
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(cpu_task, [1000000] * 4)
Timeline:
CPU Core 1: [task1-compute-compute-compute-done]
CPU Core 2: [task2-compute-compute-compute-done]
CPU Core 3: [task3-compute-compute-compute-done]
CPU Core 4: [task4-compute-compute-compute-done]
Semaphores for Rate Limiting
The Problem: Uncontrolled Concurrency
Without semaphores, asyncio.gather() fires ALL requests instantly → rate limits & bot detection.
# BAD: All 1000 requests fire instantly
urls = [f"https://api.com/product/{i}" for i in range(1000)]
async with AsyncSession() as session:
tasks = [session.get(url) for url in urls]
results = await asyncio.gather(*tasks) # 1000 requests in 0.01s → BAN
The Solution: Semaphore + Sleep
A semaphore is a concurrency primitive that limits how many tasks can run simultaneously.
semaphore = asyncio.Semaphore(10) # Max 10 tasks at once
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # Waits if 10 tasks already running
try:
response = await session.get(url)
return {"url": url, "data": response.json()}
except Exception as e:
return {"url": url, "error": str(e)}
finally:
await asyncio.sleep(0.5) # Pace each worker
async def scrape_all(urls):
semaphore = asyncio.Semaphore(10)
async with AsyncSession(impersonate="chrome110") as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Think of it like a bouncer at a club: Only 10 people (tasks) can be inside (running) at once. When someone leaves, the next person enters.
Why BOTH Semaphore AND Sleep?
Semaphore alone:
- Limits concurrent connections
- Each worker fires next request instantly after completion
- Pattern: 10 requests → 10 complete → 10 more instantly (burst)
Semaphore + sleep:
- Limits concurrent AND paces each worker
- Pattern: 10 requests → staggered completion → gradual ramp up (natural)
Result: Traffic looks human—server sees 20 req/sec spread over time, not bursts.
When You Need Semaphores
Use semaphores when:
- Scraping >50 URLs from same domain
- API has rate limits (explicit or behavioral)
- Want to avoid 429 errors and IP bans
- Need to control resource usage
Skip semaphores when:
- Calling your own API (no rate limits)
- Small number of requests (less than 10)
- API explicitly welcomes bulk access
Tuning Semaphore Values
# Conservative: 10 req/sec (very safe)
semaphore = asyncio.Semaphore(5)
await asyncio.sleep(0.5)
# Balanced: 20 req/sec (recommended starting point)
semaphore = asyncio.Semaphore(10)
await asyncio.sleep(0.5)
# Aggressive: 40 req/sec (may trigger detection)
semaphore = asyncio.Semaphore(20)
await asyncio.sleep(0.5)
Throughput examples:
- Conservative: 1000 URLs in ~100 seconds
- Balanced: 1000 URLs in ~50 seconds
- Aggressive: 1000 URLs in ~25 seconds
Strategy: Start with Semaphore(10), monitor for 429s, then adjust.
ThreadPoolExecutor vs Asyncio
ThreadPoolExecutor: Real OS Threads
from concurrent.futures import ThreadPoolExecutor
import requests
with ThreadPoolExecutor(max_workers=10) as executor:
urls = ["https://api1.com", "https://api2.com", ...]
results = executor.map(requests.get, urls)
Pros:
- Works with blocking libraries (requests, boto3, psycopg2)
- No code rewrite needed
- True parallelism for CPU-bound tasks (limited by GIL)
Cons:
- Each thread = ~8MB memory (48GB ≈ 6000 threads max)
- Context switching overhead
- Limited by GIL for Python CPU code
- Race conditions, deadlocks possible
Asyncio: Event Loop
import asyncio
from curl_cffi.requests import AsyncSession
async with AsyncSession() as session:
tasks = [session.get(url) for url in urls]
results = await asyncio.gather(*tasks)
Pros:
- Each task = ~few KB memory (10,000+ tasks easily)
- No GIL issues (single-threaded)
- No race conditions
- Much faster context switching
Cons:
- Must use async libraries (curl_cffi, aiohttp, asyncpg)
- Can't mix blocking code (blocks entire loop)
- Requires async/await throughout codebase
Performance Comparison
Real benchmarks:
- ThreadPoolExecutor: 100 threads = ~800MB RAM, ~100-1000 req/sec
- Asyncio: 10,000 tasks = ~100MB RAM, ~10,000+ req/sec
Asyncio wins decisively for I/O-bound tasks.
When to Use ThreadPoolExecutor
- Working with blocking libraries:
# boto3 (sync) doesn't have async
with ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(lambda key: s3.get_object(Bucket=b, Key=key), keys)
-
CPU-bound tasks (but ProcessPoolExecutor is better)
-
Quick parallelism without rewriting to async
When to Use Asyncio
- I/O-bound tasks (network, disk, database)
- High concurrency (thousands of connections)
- Modern libraries support it (curl_cffi, aiohttp, asyncpg)
- WebSockets, streaming, real-time apps
macOS Thread Limits (M4 MacBook)
Theoretical:
- 48GB RAM ÷ 8MB/thread = 6,000 threads
Reality:
- macOS per-process limit: ~2,048 threads
- System-wide limit: ~5,120 threads
- Practical: 50-200 threads for I/O tasks
Asyncio: Can handle 10,000-50,000 concurrent tasks on same hardware.
Production Patterns
1. Basic Pattern with Error Handling
async def fetch_with_error_handling(session, url, semaphore):
async with semaphore:
try:
response = await session.get(url)
return {"url": url, "status": "success", "data": response.json()}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
finally:
await asyncio.sleep(0.5)
async def scrape_all(urls):
semaphore = asyncio.Semaphore(10)
async with AsyncSession(impersonate="chrome110") as session:
tasks = [fetch_with_error_handling(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if r.get("status") == "success"]
failed = [r for r in results if r.get("status") == "error"]
print(f"✓ Success: {len(successful)}, ✗ Failed: {len(failed)}")
return successful
2. Production Pattern with All Features
from tqdm.asyncio import tqdm
import asyncio
from curl_cffi.requests import AsyncSession
async def fetch_with_retry(session, url, semaphore, max_retries=3):
"""
Production-ready fetch with:
- Rate limiting (semaphore)
- Exponential backoff for 429 errors
- Per-request error handling
- Random delays
"""
import random
async with semaphore:
for attempt in range(max_retries):
try:
response = await session.get(url, timeout=15)
# Handle rate limiting
if response.status_code == 429:
wait_time = (2 ** attempt) * 2 # 2s, 4s, 8s
print(f"Rate limited! Waiting {wait_time}s...")
await asyncio.sleep(wait_time)
continue
if response.status_code != 200:
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
return {"url": url, "status": "error", "code": response.status_code}
return {"url": url, "status": "success", "data": response.json()}
except Exception as e:
if attempt == max_retries - 1:
return {"url": url, "status": "error", "error": str(e)}
await asyncio.sleep(2 ** attempt) # Exponential backoff
finally:
# Random delay mimics human behavior
delay = random.uniform(0.3, 0.7)
await asyncio.sleep(delay)
async def scrape_all_production(urls):
semaphore = asyncio.Semaphore(10)
async with AsyncSession(impersonate="chrome110") as session:
tasks = [fetch_with_retry(session, url, semaphore) for url in urls]
# tqdm.gather adds progress bar
results = await tqdm.gather(*tasks, desc="Scraping")
successful = [r for r in results if r.get("status") == "success"]
failed = [r for r in results if r.get("status") == "error"]
print(f"\n✓ Success: {len(successful)}, ✗ Failed: {len(failed)}")
return successful
3. Asyncio Patterns Comparison
asyncio.gather() - Most Common
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
Pros: Returns results in order, return_exceptions=True prevents one failure killing all
Cons: Waits for ALL tasks (slowest determines total time), no progress visibility
Use when: Need all results, order matters
asyncio.as_completed() - Process as They Finish
tasks = [fetch(url) for url in urls]
for coro in asyncio.as_completed(tasks):
result = await coro
process_immediately(result) # Don't wait for all
Pros: Process results immediately, easy progress bar, better for streaming
Cons: Results unordered, more verbose
Use when: Want to process/save results as they complete
asyncio.TaskGroup - Python 3.11+
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
# Auto cleanup on exception
Pros: Structured concurrency, automatic cleanup, Pythonic
Cons: Python 3.11+ only, all-or-nothing (one exception cancels all)
Use when: On Python 3.11+ and want clean error handling
Anti-Ban Scraping Strategies
The Multi-Layer Defense
eBay and similar sites use multiple detection methods:
- TLS Fingerprinting - Analyzes TLS handshake
- Behavioral Analysis - Request patterns, timing, velocity
- Rate Limiting - IP-based and session-based limits
- Browser Fingerprinting - JavaScript challenges, canvas, WebGL
Defense Strategy:
# Layer 1: TLS Fingerprinting Bypass
from curl_cffi.requests import AsyncSession
session = AsyncSession(impersonate="chrome110") # Identical to real Chrome
# Layer 2: Behavioral Detection Bypass
semaphore = asyncio.Semaphore(10) # Controlled velocity
await asyncio.sleep(random.uniform(0.3, 0.7)) # Random human-like delays
# Layer 3: Session Rotation
# Refresh session every 50 products (new fingerprint)
for batch in batches(products, 50):
async with AsyncSession(impersonate="chrome110") as session:
# Fresh browser session
Complete Anti-Ban Pattern
async def scrape_with_anti_ban(urls: list[str]) -> list[dict]:
"""
Complete anti-ban scraping implementation
Safety Features:
1. Browser impersonation (curl_cffi)
2. Semaphore rate limiting (10 concurrent)
3. Random delays (0.3-0.7s per request)
4. Exponential backoff on errors
5. Session refresh every 50 URLs
6. Checkpoint saves for resume capability
"""
import random
all_results = []
checkpoint_file = f'checkpoint_{datetime.now():%Y%m%d_%H%M%S}.csv'
# Process in batches of 50 for session rotation
for batch_start in range(0, len(urls), 50):
batch = urls[batch_start:batch_start + 50]
print(f"Batch {batch_start//50 + 1}: URLs {batch_start+1}-{batch_start+len(batch)}")
print("Refreshing session for security...")
# Fresh session every 50 URLs
async with AsyncSession(impersonate="chrome110") as session:
semaphore = asyncio.Semaphore(10)
tasks = []
for url in batch:
task = fetch_with_retry(session, url, semaphore, max_retries=3)
tasks.append(task)
batch_results = []
for coro in asyncio.as_completed(tasks):
result = await coro
batch_results.append(result)
# Random delay between processing results
await asyncio.sleep(random.uniform(1.0, 2.0))
all_results.extend(batch_results)
# Checkpoint save every batch
checkpoint_df = pd.DataFrame(all_results)
checkpoint_df.to_csv(checkpoint_file, index=False)
print(f"Checkpoint saved: {len(all_results)} total")
return all_results
Safety Features Explained
1. Browser Impersonation (curl_cffi)
- Makes TLS handshake identical to real Chrome/Firefox
- Bypasses TLS fingerprinting (primary detection method)
- Mandatory for Cloudflare-protected sites
2. Semaphore Limiting
- Controls request velocity (not just volume)
- Prevents burst patterns that trigger behavioral analysis
- Start with 10, adjust based on 429 responses
3. Random Delays
- Mimics human pause/think time
- Prevents perfect timing patterns
- Use ranges: 0.3-0.7s per request, 1-2s between products
4. Exponential Backoff
- On errors: wait 1s, 2s, 4s between retries
- On 429 errors: wait 2s, 4s, 8s (longer)
- Shows respect, reduces hammering detection
5. Session Rotation
- New browser fingerprint every N requests
- Prevents long-running session detection
- Refresh every 50 products is good balance
6. Checkpoint Saves
- Resume capability if banned/interrupted
- Don't lose progress
- Save every 50 products or 10 minutes
Performance Tuning
Conservative (Very Safe):
- Semaphore(5), sleep(0.5-1.0s), session refresh every 30
- 1000 URLs in ~100 seconds
- Risk: Very low
Balanced (Recommended):
- Semaphore(10), sleep(0.3-0.7s), session refresh every 50
- 1000 URLs in ~40 seconds
- Risk: Low
Aggressive (Higher Risk):
- Semaphore(20), sleep(0.2-0.5s), session refresh every 100
- 1000 URLs in ~20 seconds
- Risk: Medium
Monitoring & Adjustment
Watch for these signals:
- 429 status codes → decrease semaphore, increase delays
- Connection resets → decrease semaphore
- 403 after successful requests → you've been detected, wait 1hr
Adaptive strategy:
error_count = 0
for result in results:
if result['status'] == 429:
error_count += 1
if error_count > 5:
semaphore = asyncio.Semaphore(max(semaphore._value // 2, 3))
print(f"Too many 429s! Reducing to Semaphore({semaphore._value})")
Summary & Best Practices
Key Takeaways
- Asyncio for I/O-bound tasks - 10-100x better than threads for network operations
- Semaphores are mandatory for rate limiting when scraping multiple URLs
- Semaphore + sleep together create natural traffic patterns
- curl_cffi bypasses TLS detection, semaphores bypass behavioral detection
- Session rotation prevents long-running session detection
- Checkpoint saves provide resume capability
Decision Tree
Is it I/O-bound (network/disk/DB)?
├─ Yes → Use asyncio
│ ├─ Scraping external site?
│ │ ├─ Yes → Use semaphore + random delays + session rotation
│ │ └─ No → Plain asyncio.gather() is fine
│ └─ Need async library?
│ ├─ Available → Use it (curl_cffi, aiohttp, asyncpg)
│ └─ Not available → Use ThreadPoolExecutor wrapper
└─ No (CPU-bound) → Use ProcessPoolExecutor
Production Checklist
For any scraping project:
- ✓ curl_cffi with browser impersonation
- ✓ Semaphore for rate limiting (start with 10)
- ✓ Random delays (0.3-0.7s per request)
- ✓ Exponential backoff on failures
- ✓ 429 error handling with longer waits
- ✓ Session rotation every 50 requests
- ✓ Checkpoint saves for resume capability
- ✓ Progress bar for visibility (tqdm)
- ✓ Success/failure reporting
- ✓ Logging for debugging
For production systems:
- ✓ All above, plus:
- ✓ Proxy rotation (optional but recommended)
- ✓ Monitoring dashboard
- ✓ Alert on high failure rates
- ✓ Auto-pause on sustained 429s
- ✓ Database persistence (not just CSV)
- ✓ Scheduled runs (weekly/daily)
Final Recommendation
Your scraping workflow should be:
# Weekly: Full scrape
async with AsyncSession(impersonate="chrome110") as session:
semaphore = asyncio.Semaphore(10)
results = await scrape_with_checkpoints(
session, urls, semaphore,
batch_size=50, # Session refresh interval
checkpoint_interval=50,
random_delay=(0.3, 0.7)
)
This pattern:
- Looks human (random delays, natural patterns)
- Respects rate limits (semaphore + exponential backoff)
- Survives interruptions (checkpoints)
- Avoids detection (TLS impersonation + session rotation)
- Scales well (10,000+ URLs no problem)
- Runs fast (~30-50 requests/sec sustained)
Risk level: Very low when all features combined correctly.