Gonna get straight into it, this is a WIP. A post describing some decisions I've been making while building out a teaching platform.
We have a FastAPI backend that handles CRUD operations but also triggers processing pipelines.
At first glance it's all good, we're using asyncio and taking advantage of the FastAPI/Uvicorn event loop for
multi-processing.
There is an endpoint where teachers upload assessment PDFs. This needs to do a few things:
- Grade an assessment - this is user-facing, the teacher is waiting for results
- Extract questions for our question bank - this is background work, teacher doesn't care about it
- Another long running OCR task
Right now it looks something like:
@router.post("/assessments")
async def upload_assessment(file: UploadFile, class_id: int):
s3_key = await s3.upload(file)
# Trigger pipelines
grading_task = asyncio.create_task(
grade_assessment_with_gemini(s3_key, class_id) # 15-30s per paper
)
extraction_task = asyncio.create_task(
extract_questions_to_bank(s3_key, class_id) # 10-20s per paper
)
long_ocr_task = asyncio.create_task(
long_running_ocr(s3_key, class_id) # 3 minutes per paper
)
# Wait for grading (user needs this)
results = await grading_task
# Question extraction and long OCR run in background (fire and forget)
return {"results": results}
This works fine at low traffic. But it couples processing to the API and causes problems as we scale. Behind the scenes, despite not awaiting the other tasks they are still running on your FastAPI worker's event loop. It's still taking CPU cycles, consuming memory and competing with your incoming requests for resources.
Before we can do anything drastic here we need to first address the tight coupling.
We introduce a new layer, services, for separation and decoupling.
So instead of our routes calling Azure/Gemini or hitting S3 we create:
- services/grading.py - handles all grading logic
- services/question_extraction.py - handles question bank logic
- services/storage.py - wraps S3 operations
Allowd for better compartmentalisation of purpose and our routes can act as a thin orchestration layer. Testing becomes easier we can easily mock the service layer to isolate the API. We can swap implementations or services without having to touch the routes.
"This is obvious" - some of you are serial vibecoders, believe me my blog posts will break down all concepts from this point on.
Currently these services still run on the same instance or computer as the API. They compete for the same memory, CPU, same event loop. When we start hitting the API hard to schedule these pipelines we'll have all sorts of issues with resource contention, pipeline tasks piling up, API request handling slowing down.
At a few requests per minute, this is perfectly fine. The event loop has capacity and tasks complete before the next batch arrives. Deployment remains simple, code stays clean, no complexity, no overhead. Lovely.
But then traffic picks up. A new upload request every second or two. You spin up more Uvicorn workers - 4, maybe 8. Each worker gets its own event loop, we have a load balancer that distributes evenly. Except now you start seeing weird stuff:
- Memory pressure - tasks are queuing up faster than they complete
- Uneven load - Worker A has 40 extraction tasks queued, Worker B has 5 (load balancer doesn't know about background task load)
- Latency creep - grading requests that used to take 20s are now taking 35s, even though nothing changed in the grading logic itself
Background tasks are stealing event loop time from request handlers. They're on the same worker, same event loop, competing for the same resources.
And you still have the basics missing:
- If the server restarts mid-extraction -> Work is GONE
- Tasks could fail silently, you have no idea
- No backpressure mechanism, tasks just pile up until workers OOM
Push to 10-20 req/s and it falls apart completely. Event loops saturated, 10,000+ tasks in memory, workers crashing and taking all their queued work with them. System becomes unresponsive.
FastAPI, Uvicorn, asyncio, aiohttp etc etc give you great primitives but you need to think about solving the operational problems.
The obvious solution is to stop processing on the API entirely, we already do our grading on a lambda. API should be for routing, metadata and invoking other services. Let's say we move the processing to another service. What's wrong with this? How do we connect these services? Why do we need a queue from here?
I typically skip considering a DB as a proto-queue, although it gives you persistence, monitoring, retry logic, it's just hideous. Having to poll a DB repeatedly can become a bottleneck but honestly it just isn't cool.
In previous projects, I've typically used SQS for this kind of thing. The API receives a request, writes a message to SQS, returns immediately (good). Separate consumer processes pull from SQS and do the actual work. Super clean separation, durable queue with nice logging and monitoring (Cloudwatch), you get dead letter queues, you can auto-scale the consumers, all good. It's also fully managed, you'll see what I mean by what needs to be managed soon.
However SQS consumes a message and deletes it, one consumer per message. In our example we need multuple consumers per message. I mean you could batch all those processes in a single consuming service tbh. But the solution is definitely not to have multiple queues. Instead you have a persistent message queue like Kafka. Messages stay in the topic, multiple consumers can read the same message independently as they're subscribed to the same queue. This allows them to process messages at their own pace as they track their own index.
So the architecture becomes:
@router.post("/assessments/upload")
async def upload_assessment(file: UploadFile, class_id: int):
s3_key = await s3.upload(file)
# Publish event to Kafka
await kafka_producer.send(
topic="assessment-uploads",
value={
"s3_key": s3_key,
"class_id": class_id,
"upload_timestamp": datetime.now()
}
)
return {"job_id": s3_key, "status": "processing"}
API is done. It routes the request, publishes an event, returns. That's it.
Then you have separate consumer services:
async def grading_consumer():
async for message in kafka_consumer:
event = message.value
await grade_assessment(event["s3_key"], event["class_id"])
async def extraction_consumer():
async for message in kafka_consumer:
event = message.value
await extract_questions(event["s3_key"], event["class_id"])
These run as completely separate processes, on separate instances even. They scale independently. If grading needs more capacity, you add more grading consumers. If extraction is falling behind, you add extraction consumers. If you want to add a new service later, easy add a new consumer group, no changes to the API! or existing consumers! Super clean.
And you get all the things you need:
- Durability - messages persist in Kafka, survives restarts
- Retry logic - consumers can reprocess failed messages
- Monitoring - Kafka tracks consumer lag, you can see exactly what's queued
- Backpressure - consumers pull at their own rate
- Resource isolation - API and consumers are completely separate
Obviously Kafka is more complex than fire-and-forget with create_task(). You need to run Kafka brokers, manage topics, monitor consumer lag, handle rebalancing, all that operational overhead, its quite long actually so a managed Kafka solution (AWS MSK) is nicer but pricier.
Yeah.
ORRRRRR we could look at something called Temporal. Now bear with me here as this is a recent discovery of mine. Remember how earlier we were trying to run the pipeline tasks on a separate event loop on the same instance?
Now imagine if we could schedule them on a separate event loop... on a separate instance. That's what Temporal is, a distributed asyncio event loop.
The idea is you write workflows using native Python asyncio patterns but get durability, retries, and state management for free. Your code looks like async/await but Temporal handles the distributed systems problems.
Here's the assessment upload with Temporal:
# Activities - non-deterministic work
from temporalio import activity
from dataclasses import dataclass
@activity.defn(name="grade-assessment")
async def grade_assessment(params: AssessmentParams) -> dict:
return await gemini_client.grade(params.s3_key)
@activity.defn(name="extract-questions")
async def extract_questions(params: AssessmentParams) -> None:
await question_bank.extract(params.s3_key, params.class_id)
@activity.defn(name="run-ocr")
async def run_ocr(params: AssessmentParams) -> None:
await ocr_service.process(params.s3_key)
# Workflow - orchestration
from datetime import timedelta
from temporalio import workflow
import asyncio
@workflow.defn(name="assessment-upload")
class AssessmentWorkflow:
@workflow.run
async def run(self, params: AssessmentParams) -> dict:
grading, extraction, ocr = await asyncio.gather(
workflow.execute_activity(
"grade-assessment",
params,
start_to_close_timeout=timedelta(seconds=60)
),
workflow.execute_activity(
"extract-questions",
params,
start_to_close_timeout=timedelta(seconds=30)
),
workflow.execute_activity(
"run-ocr",
params,
start_to_close_timeout=timedelta(minutes=5)
)
)
return grading
# API endpoint
from temporalio.client import Client
@router.post("/assessments/upload")
async def upload_assessment(file: UploadFile, class_id: int):
s3_key = await s3.upload(file)
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
"assessment-upload",
AssessmentParams(s3_key=s3_key, class_id=class_id),
id=f"assessment-{s3_key}",
task_queue="assessments"
)
results = await handle.result()
return {"results": results}
Temporal is syntactically MORE complex than Kafka.
Look at Kafka:
await kafka_producer.send(topic="uploads", value={...})
async for message in kafka_consumer:
await process(message.value)
Temporal requires dataclasses for params, activity decorators, workflow classes with special decorators, worker registration that knows about all your workflows and activities. Way more boilerplate.
The mental model is harder too. Kafka is just "messages go in, workers pull them out." Temporal requires understanding determinism constraints, replay semantics, event sourcing, what can go in workflow code vs activity code. If you mess up and call an API directly in a workflow instead of an activity, things break in weird ways.
Where Temporal wins is architectural complexity. You don't need to run Kafka brokers, manage topics, handle consumer rebalancing, deploy separate consumer services. Just one Temporal server and workers that run alongside your app.
So the trade-offs are weird:
- Code complexity: Kafka simpler
- Mental model: Kafka simpler
- Architecture/ops: Temporal simpler
- Capabilities: Temporal more powerful
For the assessment upload case I'd probably use Temporal. You have interdependent steps, varying timeouts, and might want to query workflow state later. The syntax is verbose but not having to manage a Kafka cluster is worth it at moderate scale.
Looks like I'm going to be spending a lot of time looking into Temporal, can't believe I hadn't heard of it in the past. I guess you need to learn on the JOB for things like this!
Yeah.