Async Programming & Concurrency Patterns - Complete Guide

PythonEngineeringAsyncConcurrency

January 15, 2025

13 min read

Table of Contents

  1. Why Asyncio?
  2. Real-World Use Cases
  3. Async vs Parallel
  4. Semaphores for Rate Limiting
  5. ThreadPoolExecutor vs Asyncio
  6. Production Patterns
  7. Anti-Ban Scraping Strategies

Why Asyncio?

Primary Use Case: I/O-Bound Operations

Perfect for network requests, database queries, file I/O where the CPU waits for external resources.

Example:

@app.post("/process")
async def process_items(items: list[str]):
    # Fetch data for all items (N API calls concurrently)
    async with requests.AsyncSession(impersonate="chrome110") as session:
        tasks = [fetch_item(session, item_id) for item_id in items]
        return await asyncio.gather(*tasks)

Performance:

  • Without asyncio: 200ms × 3 items = 600ms
  • With asyncio: 200ms total (all concurrent)

Benefits:

  • Single-threaded, no race conditions
  • Minimal memory overhead (~1-2KB per task)
  • Can handle 10,000+ concurrent connections
  • Explicit await points show where context switches occur

Real-World Use Cases

1. Web Scraping (High Concurrency)

Scraping thousands of pages with rate limiting and anti-detection measures.

Use case: Price monitoring, data aggregation, market research
Scale: 1,000-100,000 URLs
Pattern: Semaphore + random delays + session rotation

2. API Gateway Pattern

Fan-out requests to multiple microservices and aggregate results.

Use case: User dashboard aggregating auth, profile, orders, analytics
Scale: 5-20 concurrent service calls per request
Pattern: asyncio.gather() with timeout handling

3. Database Query Optimization

Parallel database queries when data isn't interdependent.

Use case: Dashboard loading multiple metrics simultaneously
Scale: 10-50 concurrent queries
Pattern: asyncio.gather() with connection pooling

4. File Operations at Scale

Concurrent file uploads/downloads/processing.

Use case: Batch image processing, S3 operations, log aggregation
Scale: 100-10,000 files
Pattern: Semaphore-limited async file I/O

5. Real-Time Data Streaming

WebSocket connections for live updates.

Use case: Chat systems, live dashboards, notification services
Scale: 1,000-100,000 concurrent connections
Pattern: Async WebSocket handlers with message queues

6. External API Aggregation

Calling multiple external APIs and combining results.

Use case: Flight booking (compare prices across airlines), weather aggregation
Scale: 3-50 concurrent API calls
Pattern: asyncio.gather() with exponential backoff


Async vs Parallel: The Critical Difference

Async = Concurrency (Interleaved)

Definition: Multiple tasks make progress by switching between them during idle time.
Execution: Single-threaded, one task runs at a time.
Use for: I/O-bound tasks (network, disk, database)

async def main():
    # These don't run in parallel—they're interleaved
    # When task1 waits for I/O, task2 runs
    results = await asyncio.gather(
        fetch_url("https://api1.com"),  # Waits for network
        fetch_url("https://api2.com"),  # Runs while api1 waits
        fetch_url("https://api3.com"),  # Runs while api1/2 wait
    )

Timeline:

Thread 1: [task1-send]---[wait]------[task1-receive]
                 [task2-send]---[wait]------[task2-receive]
                         [task3-send]---[wait]------[task3-receive]

Parallel = True Parallelism (Simultaneous)

Definition: Multiple tasks run simultaneously on different CPU cores.
Execution: Multi-threaded/multi-process.
Use for: CPU-bound tasks (calculations, data processing)

from concurrent.futures import ProcessPoolExecutor

def cpu_task(n):
    return sum(i**2 for i in range(n))

with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(cpu_task, [1000000] * 4)

Timeline:

CPU Core 1: [task1-compute-compute-compute-done]
CPU Core 2: [task2-compute-compute-compute-done]
CPU Core 3: [task3-compute-compute-compute-done]
CPU Core 4: [task4-compute-compute-compute-done]

Semaphores for Rate Limiting

The Problem: Uncontrolled Concurrency

Without semaphores, asyncio.gather() fires ALL requests instantly → rate limits & bot detection.

# BAD: All 1000 requests fire instantly
urls = [f"https://api.com/product/{i}" for i in range(1000)]
async with AsyncSession() as session:
    tasks = [session.get(url) for url in urls]
    results = await asyncio.gather(*tasks)  # 1000 requests in 0.01s → BAN

The Solution: Semaphore + Sleep

A semaphore is a concurrency primitive that limits how many tasks can run simultaneously.

semaphore = asyncio.Semaphore(10)  # Max 10 tasks at once

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:  # Waits if 10 tasks already running
        try:
            response = await session.get(url)
            return {"url": url, "data": response.json()}
        except Exception as e:
            return {"url": url, "error": str(e)}
        finally:
            await asyncio.sleep(0.5)  # Pace each worker

async def scrape_all(urls):
    semaphore = asyncio.Semaphore(10)
    async with AsyncSession(impersonate="chrome110") as session:
        tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

Think of it like a bouncer at a club: Only 10 people (tasks) can be inside (running) at once. When someone leaves, the next person enters.

Why BOTH Semaphore AND Sleep?

Semaphore alone:

  • Limits concurrent connections
  • Each worker fires next request instantly after completion
  • Pattern: 10 requests → 10 complete → 10 more instantly (burst)

Semaphore + sleep:

  • Limits concurrent AND paces each worker
  • Pattern: 10 requests → staggered completion → gradual ramp up (natural)

Result: Traffic looks human—server sees 20 req/sec spread over time, not bursts.

When You Need Semaphores

Use semaphores when:

  • Scraping >50 URLs from same domain
  • API has rate limits (explicit or behavioral)
  • Want to avoid 429 errors and IP bans
  • Need to control resource usage

Skip semaphores when:

  • Calling your own API (no rate limits)
  • Small number of requests (less than 10)
  • API explicitly welcomes bulk access

Tuning Semaphore Values

# Conservative: 10 req/sec (very safe)
semaphore = asyncio.Semaphore(5)
await asyncio.sleep(0.5)

# Balanced: 20 req/sec (recommended starting point)
semaphore = asyncio.Semaphore(10)
await asyncio.sleep(0.5)

# Aggressive: 40 req/sec (may trigger detection)
semaphore = asyncio.Semaphore(20)
await asyncio.sleep(0.5)

Throughput examples:

  • Conservative: 1000 URLs in ~100 seconds
  • Balanced: 1000 URLs in ~50 seconds
  • Aggressive: 1000 URLs in ~25 seconds

Strategy: Start with Semaphore(10), monitor for 429s, then adjust.


ThreadPoolExecutor vs Asyncio

ThreadPoolExecutor: Real OS Threads

from concurrent.futures import ThreadPoolExecutor
import requests

with ThreadPoolExecutor(max_workers=10) as executor:
    urls = ["https://api1.com", "https://api2.com", ...]
    results = executor.map(requests.get, urls)

Pros:

  • Works with blocking libraries (requests, boto3, psycopg2)
  • No code rewrite needed
  • True parallelism for CPU-bound tasks (limited by GIL)

Cons:

  • Each thread = ~8MB memory (48GB ≈ 6000 threads max)
  • Context switching overhead
  • Limited by GIL for Python CPU code
  • Race conditions, deadlocks possible

Asyncio: Event Loop

import asyncio
from curl_cffi.requests import AsyncSession

async with AsyncSession() as session:
    tasks = [session.get(url) for url in urls]
    results = await asyncio.gather(*tasks)

Pros:

  • Each task = ~few KB memory (10,000+ tasks easily)
  • No GIL issues (single-threaded)
  • No race conditions
  • Much faster context switching

Cons:

  • Must use async libraries (curl_cffi, aiohttp, asyncpg)
  • Can't mix blocking code (blocks entire loop)
  • Requires async/await throughout codebase

Performance Comparison

Real benchmarks:

  • ThreadPoolExecutor: 100 threads = ~800MB RAM, ~100-1000 req/sec
  • Asyncio: 10,000 tasks = ~100MB RAM, ~10,000+ req/sec

Asyncio wins decisively for I/O-bound tasks.

When to Use ThreadPoolExecutor

  1. Working with blocking libraries:
# boto3 (sync) doesn't have async
with ThreadPoolExecutor(max_workers=20) as executor:
    results = executor.map(lambda key: s3.get_object(Bucket=b, Key=key), keys)
  1. CPU-bound tasks (but ProcessPoolExecutor is better)

  2. Quick parallelism without rewriting to async

When to Use Asyncio

  1. I/O-bound tasks (network, disk, database)
  2. High concurrency (thousands of connections)
  3. Modern libraries support it (curl_cffi, aiohttp, asyncpg)
  4. WebSockets, streaming, real-time apps

macOS Thread Limits (M4 MacBook)

Theoretical:

  • 48GB RAM ÷ 8MB/thread = 6,000 threads

Reality:

  • macOS per-process limit: ~2,048 threads
  • System-wide limit: ~5,120 threads
  • Practical: 50-200 threads for I/O tasks

Asyncio: Can handle 10,000-50,000 concurrent tasks on same hardware.


Production Patterns

1. Basic Pattern with Error Handling

async def fetch_with_error_handling(session, url, semaphore):
    async with semaphore:
        try:
            response = await session.get(url)
            return {"url": url, "status": "success", "data": response.json()}
        except Exception as e:
            return {"url": url, "status": "error", "error": str(e)}
        finally:
            await asyncio.sleep(0.5)

async def scrape_all(urls):
    semaphore = asyncio.Semaphore(10)
    async with AsyncSession(impersonate="chrome110") as session:
        tasks = [fetch_with_error_handling(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = [r for r in results if r.get("status") == "success"]
    failed = [r for r in results if r.get("status") == "error"]
    print(f"✓ Success: {len(successful)}, ✗ Failed: {len(failed)}")
    return successful

2. Production Pattern with All Features

from tqdm.asyncio import tqdm
import asyncio
from curl_cffi.requests import AsyncSession

async def fetch_with_retry(session, url, semaphore, max_retries=3):
    """
    Production-ready fetch with:
    - Rate limiting (semaphore)
    - Exponential backoff for 429 errors
    - Per-request error handling
    - Random delays
    """
    import random
    
    async with semaphore:
        for attempt in range(max_retries):
            try:
                response = await session.get(url, timeout=15)
                
                # Handle rate limiting
                if response.status_code == 429:
                    wait_time = (2 ** attempt) * 2  # 2s, 4s, 8s
                    print(f"Rate limited! Waiting {wait_time}s...")
                    await asyncio.sleep(wait_time)
                    continue
                
                if response.status_code != 200:
                    if attempt < max_retries - 1:
                        await asyncio.sleep(1)
                        continue
                    return {"url": url, "status": "error", "code": response.status_code}
                
                return {"url": url, "status": "success", "data": response.json()}
                
            except Exception as e:
                if attempt == max_retries - 1:
                    return {"url": url, "status": "error", "error": str(e)}
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            finally:
                # Random delay mimics human behavior
                delay = random.uniform(0.3, 0.7)
                await asyncio.sleep(delay)

async def scrape_all_production(urls):
    semaphore = asyncio.Semaphore(10)
    async with AsyncSession(impersonate="chrome110") as session:
        tasks = [fetch_with_retry(session, url, semaphore) for url in urls]
        # tqdm.gather adds progress bar
        results = await tqdm.gather(*tasks, desc="Scraping")
    
    successful = [r for r in results if r.get("status") == "success"]
    failed = [r for r in results if r.get("status") == "error"]
    print(f"\n✓ Success: {len(successful)}, ✗ Failed: {len(failed)}")
    return successful

3. Asyncio Patterns Comparison

asyncio.gather() - Most Common

tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)

Pros: Returns results in order, return_exceptions=True prevents one failure killing all
Cons: Waits for ALL tasks (slowest determines total time), no progress visibility
Use when: Need all results, order matters

asyncio.as_completed() - Process as They Finish

tasks = [fetch(url) for url in urls]
for coro in asyncio.as_completed(tasks):
    result = await coro
    process_immediately(result)  # Don't wait for all

Pros: Process results immediately, easy progress bar, better for streaming
Cons: Results unordered, more verbose
Use when: Want to process/save results as they complete

asyncio.TaskGroup - Python 3.11+

async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(fetch(url)) for url in urls]
# Auto cleanup on exception

Pros: Structured concurrency, automatic cleanup, Pythonic
Cons: Python 3.11+ only, all-or-nothing (one exception cancels all)
Use when: On Python 3.11+ and want clean error handling


Anti-Ban Scraping Strategies

The Multi-Layer Defense

eBay and similar sites use multiple detection methods:

  1. TLS Fingerprinting - Analyzes TLS handshake
  2. Behavioral Analysis - Request patterns, timing, velocity
  3. Rate Limiting - IP-based and session-based limits
  4. Browser Fingerprinting - JavaScript challenges, canvas, WebGL

Defense Strategy:

# Layer 1: TLS Fingerprinting Bypass
from curl_cffi.requests import AsyncSession
session = AsyncSession(impersonate="chrome110")  # Identical to real Chrome

# Layer 2: Behavioral Detection Bypass
semaphore = asyncio.Semaphore(10)  # Controlled velocity
await asyncio.sleep(random.uniform(0.3, 0.7))  # Random human-like delays

# Layer 3: Session Rotation
# Refresh session every 50 products (new fingerprint)
for batch in batches(products, 50):
    async with AsyncSession(impersonate="chrome110") as session:
        # Fresh browser session

Complete Anti-Ban Pattern

async def scrape_with_anti_ban(urls: list[str]) -> list[dict]:
    """
    Complete anti-ban scraping implementation
    
    Safety Features:
    1. Browser impersonation (curl_cffi)
    2. Semaphore rate limiting (10 concurrent)
    3. Random delays (0.3-0.7s per request)
    4. Exponential backoff on errors
    5. Session refresh every 50 URLs
    6. Checkpoint saves for resume capability
    """
    import random
    
    all_results = []
    checkpoint_file = f'checkpoint_{datetime.now():%Y%m%d_%H%M%S}.csv'
    
    # Process in batches of 50 for session rotation
    for batch_start in range(0, len(urls), 50):
        batch = urls[batch_start:batch_start + 50]
        
        print(f"Batch {batch_start//50 + 1}: URLs {batch_start+1}-{batch_start+len(batch)}")
        print("Refreshing session for security...")
        
        # Fresh session every 50 URLs
        async with AsyncSession(impersonate="chrome110") as session:
            semaphore = asyncio.Semaphore(10)
            
            tasks = []
            for url in batch:
                task = fetch_with_retry(session, url, semaphore, max_retries=3)
                tasks.append(task)
            
            batch_results = []
            for coro in asyncio.as_completed(tasks):
                result = await coro
                batch_results.append(result)
                
                # Random delay between processing results
                await asyncio.sleep(random.uniform(1.0, 2.0))
            
            all_results.extend(batch_results)
            
            # Checkpoint save every batch
            checkpoint_df = pd.DataFrame(all_results)
            checkpoint_df.to_csv(checkpoint_file, index=False)
            print(f"Checkpoint saved: {len(all_results)} total")
    
    return all_results

Safety Features Explained

1. Browser Impersonation (curl_cffi)

  • Makes TLS handshake identical to real Chrome/Firefox
  • Bypasses TLS fingerprinting (primary detection method)
  • Mandatory for Cloudflare-protected sites

2. Semaphore Limiting

  • Controls request velocity (not just volume)
  • Prevents burst patterns that trigger behavioral analysis
  • Start with 10, adjust based on 429 responses

3. Random Delays

  • Mimics human pause/think time
  • Prevents perfect timing patterns
  • Use ranges: 0.3-0.7s per request, 1-2s between products

4. Exponential Backoff

  • On errors: wait 1s, 2s, 4s between retries
  • On 429 errors: wait 2s, 4s, 8s (longer)
  • Shows respect, reduces hammering detection

5. Session Rotation

  • New browser fingerprint every N requests
  • Prevents long-running session detection
  • Refresh every 50 products is good balance

6. Checkpoint Saves

  • Resume capability if banned/interrupted
  • Don't lose progress
  • Save every 50 products or 10 minutes

Performance Tuning

Conservative (Very Safe):

  • Semaphore(5), sleep(0.5-1.0s), session refresh every 30
  • 1000 URLs in ~100 seconds
  • Risk: Very low

Balanced (Recommended):

  • Semaphore(10), sleep(0.3-0.7s), session refresh every 50
  • 1000 URLs in ~40 seconds
  • Risk: Low

Aggressive (Higher Risk):

  • Semaphore(20), sleep(0.2-0.5s), session refresh every 100
  • 1000 URLs in ~20 seconds
  • Risk: Medium

Monitoring & Adjustment

Watch for these signals:

  • 429 status codes → decrease semaphore, increase delays
  • Connection resets → decrease semaphore
  • 403 after successful requests → you've been detected, wait 1hr

Adaptive strategy:

error_count = 0
for result in results:
    if result['status'] == 429:
        error_count += 1
        if error_count > 5:
            semaphore = asyncio.Semaphore(max(semaphore._value // 2, 3))
            print(f"Too many 429s! Reducing to Semaphore({semaphore._value})")

Summary & Best Practices

Key Takeaways

  1. Asyncio for I/O-bound tasks - 10-100x better than threads for network operations
  2. Semaphores are mandatory for rate limiting when scraping multiple URLs
  3. Semaphore + sleep together create natural traffic patterns
  4. curl_cffi bypasses TLS detection, semaphores bypass behavioral detection
  5. Session rotation prevents long-running session detection
  6. Checkpoint saves provide resume capability

Decision Tree

Is it I/O-bound (network/disk/DB)?
├─ Yes → Use asyncio
│   ├─ Scraping external site?
│   │   ├─ Yes → Use semaphore + random delays + session rotation
│   │   └─ No → Plain asyncio.gather() is fine
│   └─ Need async library?
│       ├─ Available → Use it (curl_cffi, aiohttp, asyncpg)
│       └─ Not available → Use ThreadPoolExecutor wrapper
└─ No (CPU-bound) → Use ProcessPoolExecutor

Production Checklist

For any scraping project:

  • ✓ curl_cffi with browser impersonation
  • ✓ Semaphore for rate limiting (start with 10)
  • ✓ Random delays (0.3-0.7s per request)
  • ✓ Exponential backoff on failures
  • ✓ 429 error handling with longer waits
  • ✓ Session rotation every 50 requests
  • ✓ Checkpoint saves for resume capability
  • ✓ Progress bar for visibility (tqdm)
  • ✓ Success/failure reporting
  • ✓ Logging for debugging

For production systems:

  • ✓ All above, plus:
  • ✓ Proxy rotation (optional but recommended)
  • ✓ Monitoring dashboard
  • ✓ Alert on high failure rates
  • ✓ Auto-pause on sustained 429s
  • ✓ Database persistence (not just CSV)
  • ✓ Scheduled runs (weekly/daily)

Final Recommendation

Your scraping workflow should be:

# Weekly: Full scrape
async with AsyncSession(impersonate="chrome110") as session:
    semaphore = asyncio.Semaphore(10)
    results = await scrape_with_checkpoints(
        session, urls, semaphore,
        batch_size=50,  # Session refresh interval
        checkpoint_interval=50,
        random_delay=(0.3, 0.7)
    )

This pattern:

  • Looks human (random delays, natural patterns)
  • Respects rate limits (semaphore + exponential backoff)
  • Survives interruptions (checkpoints)
  • Avoids detection (TLS impersonation + session rotation)
  • Scales well (10,000+ URLs no problem)
  • Runs fast (~30-50 requests/sec sustained)

Risk level: Very low when all features combined correctly.