Building Production-Ready REST APIs with FastAPI, PostgreSQL, and Docker

FastAPI is the fastest-growing Python web framework for a reason. Automatic docs, type safety, async support, and blazing performance. In this tutorial, you'll build a complete REST API from scratch and containerize it for production.
What You'll Build
A fully functional Task Management API with:
- CRUD operations for projects and tasks
- User authentication with JWT tokens
- PostgreSQL database with SQLAlchemy ORM
- Database migrations with Alembic
- Docker Compose for local development and production
- Automated API documentation (Swagger & ReDoc)
- Input validation and error handling
- Environment-based configuration
Prerequisites
Before starting, make sure you have:
- Python 3.11+ installed (python.org)
- Docker and Docker Compose (docker.com)
- Basic knowledge of Python and REST APIs
- A code editor (VS Code recommended)
- A terminal/command line
All code in this tutorial is available on GitHub. You can follow along step by step or clone the final result.
Step 1: Project Setup and Structure
Create your project directory and set up a clean structure:
mkdir taskflow-api && cd taskflow-api
mkdir -p app/{api,core,models,schemas,services}
touch app/__init__.py app/main.py app/database.py
touch app/api/__init__.py app/core/__init__.py
touch app/models/__init__.py app/schemas/__init__.py
touch app/services/__init__.py
touch requirements.txt .env .env.exampleYour project should look like this:
taskflow-api/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── projects.py
│ │ └── tasks.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py
│ │ └── security.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── models.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ └── schemas.py
│ └── services/
│ ├── __init__.py
│ └── auth_service.py
├── alembic/
├── tests/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── .env
└── alembic.ini
Step 2: Install Dependencies
Add the following to requirements.txt:
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
asyncpg==0.30.0
alembic==1.14.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic-settings==2.6.0
httpx==0.28.0
pytest==8.3.0
pytest-asyncio==0.24.0Create a virtual environment and install:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txtStep 3: Configuration Management
Create app/core/config.py to handle environment variables:
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Application
APP_NAME: str = "TaskFlow API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# Database
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow"
# Authentication
SECRET_KEY: str = "your-secret-key-change-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache()
def get_settings() -> Settings:
return Settings()Create your .env file:
APP_NAME=TaskFlow API
DEBUG=true
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow
SECRET_KEY=your-super-secret-key-at-least-32-characters-longNever commit your .env file to version control. Add it to .gitignore immediately. Use .env.example with placeholder values for documentation.
Step 4: Database Setup with SQLAlchemy
Create the async database connection in app/database.py:
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from app.core.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()Step 5: Define Your Models
Create app/models/models.py:
import uuid
from datetime import datetime
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
String,
Text,
Enum as SAEnum,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from app.database import Base
import enum
class TaskStatus(str, enum.Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"
ARCHIVED = "archived"
class TaskPriority(str, enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
email: Mapped[str] = mapped_column(
String(255), unique=True, index=True, nullable=False
)
username: Mapped[str] = mapped_column(
String(100), unique=True, index=True, nullable=False
)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# Relationships
projects: Mapped[list["Project"]] = relationship(back_populates="owner")
class Project(Base):
__tablename__ = "projects"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
name: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
owner_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
# Relationships
owner: Mapped["User"] = relationship(back_populates="projects")
tasks: Mapped[list["Task"]] = relationship(
back_populates="project", cascade="all, delete-orphan"
)
class Task(Base):
__tablename__ = "tasks"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
title: Mapped[str] = mapped_column(String(300), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[TaskStatus] = mapped_column(
SAEnum(TaskStatus), default=TaskStatus.TODO
)
priority: Mapped[TaskPriority] = mapped_column(
SAEnum(TaskPriority), default=TaskPriority.MEDIUM
)
due_date: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
project_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
# Relationships
project: Mapped["Project"] = relationship(back_populates="tasks")Step 6: Pydantic Schemas for Validation
Create app/schemas/schemas.py:
import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
from app.models.models import TaskStatus, TaskPriority
# --- Auth Schemas ---
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=100)
password: str = Field(..., min_length=8)
class UserResponse(BaseModel):
id: uuid.UUID
email: str
username: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
user_id: uuid.UUID | None = None
# --- Project Schemas ---
class ProjectCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: str | None = None
class ProjectUpdate(BaseModel):
name: str | None = Field(None, min_length=1, max_length=200)
description: str | None = None
class ProjectResponse(BaseModel):
id: uuid.UUID
name: str
description: str | None
owner_id: uuid.UUID
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
# --- Task Schemas ---
class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=300)
description: str | None = None
status: TaskStatus = TaskStatus.TODO
priority: TaskPriority = TaskPriority.MEDIUM
due_date: datetime | None = None
class TaskUpdate(BaseModel):
title: str | None = Field(None, min_length=1, max_length=300)
description: str | None = None
status: TaskStatus | None = None
priority: TaskPriority | None = None
due_date: datetime | None = None
class TaskResponse(BaseModel):
id: uuid.UUID
title: str
description: str | None
status: TaskStatus
priority: TaskPriority
due_date: datetime | None
project_id: uuid.UUID
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}Step 7: Authentication with JWT
Create app/core/security.py:
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import TokenData
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=user_id)
except JWTError:
raise credentials_exception
result = await db.execute(
select(User).where(User.id == token_data.user_id)
)
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return userStep 8: API Routes
Authentication Routes — app/api/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import UserCreate, UserResponse, Token
from app.core.security import hash_password, verify_password, create_access_token
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
# Check if email already exists
result = await db.execute(select(User).where(User.email == user_data.email))
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Check if username already exists
result = await db.execute(
select(User).where(User.username == user_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken",
)
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hash_password(user_data.password),
)
db.add(user)
await db.flush()
await db.refresh(user)
return user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(User).where(User.email == form_data.username)
)
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
)
access_token = create_access_token(data={"sub": str(user.id)})
return Token(access_token=access_token)Project Routes — app/api/projects.py
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import Project, User
from app.schemas.schemas import ProjectCreate, ProjectUpdate, ProjectResponse
from app.core.security import get_current_user
router = APIRouter(prefix="/api/projects", tags=["Projects"])
@router.get("/", response_model=list[ProjectResponse])
async def list_projects(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(Project.owner_id == current_user.id)
)
return result.scalars().all()
@router.post("/", response_model=ProjectResponse, status_code=201)
async def create_project(
project_data: ProjectCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
project = Project(**project_data.model_dump(), owner_id=current_user.id)
db.add(project)
await db.flush()
await db.refresh(project)
return project
@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
project_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
@router.patch("/{project_id}", response_model=ProjectResponse)
async def update_project(
project_id: UUID,
project_data: ProjectUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
update_data = project_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(project, field, value)
await db.flush()
await db.refresh(project)
return project
@router.delete("/{project_id}", status_code=204)
async def delete_project(
project_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
await db.delete(project)Task Routes — app/api/tasks.py
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import Task, Project, User, TaskStatus
from app.schemas.schemas import TaskCreate, TaskUpdate, TaskResponse
from app.core.security import get_current_user
router = APIRouter(prefix="/api/projects/{project_id}/tasks", tags=["Tasks"])
@router.get("/", response_model=list[TaskResponse])
async def list_tasks(
project_id: UUID,
status: TaskStatus | None = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Verify project ownership
proj = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
if not proj.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Project not found")
query = select(Task).where(Task.project_id == project_id)
if status:
query = query.where(Task.status == status)
result = await db.execute(query.order_by(Task.created_at.desc()))
return result.scalars().all()
@router.post("/", response_model=TaskResponse, status_code=201)
async def create_task(
project_id: UUID,
task_data: TaskCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
proj = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
if not proj.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Project not found")
task = Task(**task_data.model_dump(), project_id=project_id)
db.add(task)
await db.flush()
await db.refresh(task)
return task
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
project_id: UUID,
task_id: UUID,
task_data: TaskUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Task)
.join(Project)
.where(
Task.id == task_id,
Task.project_id == project_id,
Project.owner_id == current_user.id,
)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
for field, value in task_data.model_dump(exclude_unset=True).items():
setattr(task, field, value)
await db.flush()
await db.refresh(task)
return task
@router.delete("/{task_id}", status_code=204)
async def delete_task(
project_id: UUID,
task_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Task)
.join(Project)
.where(
Task.id == task_id,
Task.project_id == project_id,
Project.owner_id == current_user.id,
)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
await db.delete(task)Step 9: Main Application Entry Point
Create app/main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import get_settings
from app.api import auth, projects, tasks
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print(f"🚀 {settings.APP_NAME} v{settings.APP_VERSION} starting...")
yield
# Shutdown
print("👋 Shutting down...")
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth.router)
app.include_router(projects.router)
app.include_router(tasks.router)
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": settings.APP_VERSION}Step 10: Database Migrations with Alembic
Initialize Alembic and configure it for async:
alembic init alembicUpdate alembic/env.py to use async:
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database import Base
from app.models.models import User, Project, Task # Import all models
from app.core.config import get_settings
config = context.config
settings = get_settings()
# Set the database URL dynamically
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Create and run your first migration:
alembic revision --autogenerate -m "initial tables"
alembic upgrade headStep 11: Dockerize Everything
Create a Dockerfile:
FROM python:3.12-slim AS base
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Production stage
FROM base AS production
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]Create docker-compose.yml:
services:
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: taskflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
api:
build:
context: .
target: production
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
SECRET_KEY: ${SECRET_KEY:-change-me-in-production-please}
DEBUG: "false"
depends_on:
db:
condition: service_healthy
restart: unless-stopped
migrate:
build:
context: .
target: base
command: alembic upgrade head
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
depends_on:
db:
condition: service_healthy
volumes:
postgres_data:Launch everything:
docker compose up -dWait a moment, then verify:
# Check containers are running
docker compose ps
# Check API health
curl http://localhost:8000/healthYou should see:
{"status": "healthy", "version": "1.0.0"}Visit http://localhost:8000/docs to see the auto-generated Swagger UI. You can test all endpoints directly from the browser!
Step 12: Testing Your API
Create tests/test_api.py:
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
def anyio_backend():
return "asyncio"
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.anyio
async def test_health_check(client: AsyncClient):
response = await client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
@pytest.mark.anyio
async def test_register_user(client: AsyncClient):
response = await client.post(
"/api/auth/register",
json={
"email": "test@example.com",
"username": "testuser",
"password": "securepassword123",
},
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert "hashed_password" not in data
@pytest.mark.anyio
async def test_login_and_create_project(client: AsyncClient):
# Register
await client.post(
"/api/auth/register",
json={
"email": "dev@example.com",
"username": "developer",
"password": "securepassword123",
},
)
# Login
login_response = await client.post(
"/api/auth/login",
data={"username": "dev@example.com", "password": "securepassword123"},
)
token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Create project
response = await client.post(
"/api/projects/",
json={"name": "My Project", "description": "A test project"},
headers=headers,
)
assert response.status_code == 201
assert response.json()["name"] == "My Project"Run the tests:
pytest tests/ -vStep 13: Production Hardening Tips
Before deploying to production, consider these improvements:
1. Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
...2. Structured Logging
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def log_requests(request, call_next):
logger.info("request_started", method=request.method, path=request.url.path)
response = await call_next(request)
logger.info("request_completed", status_code=response.status_code)
return response3. Health Check with Database
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
return {"status": "healthy", "database": "connected"}
except Exception:
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "database": "disconnected"},
)4. Environment-Specific Docker Compose
Create docker-compose.prod.yml to override for production with proper secrets management, resource limits, and monitoring.
Summary
In this tutorial, you've built a complete production-ready REST API with:
- ✅ FastAPI — async Python web framework with automatic OpenAPI docs
- ✅ SQLAlchemy 2.0 — modern async ORM with mapped columns
- ✅ PostgreSQL — robust relational database
- ✅ Alembic — version-controlled database migrations
- ✅ JWT Authentication — secure token-based auth
- ✅ Docker Compose — containerized development and deployment
- ✅ Pydantic v2 — fast data validation and serialization
- ✅ Testing — async tests with httpx and pytest
Next Steps
- Add pagination to list endpoints
- Implement WebSocket for real-time task updates
- Set up CI/CD with GitHub Actions
- Add Redis caching for frequent queries
- Deploy to a cloud provider (AWS ECS, GCP Cloud Run, or a VPS with Docker)
FastAPI's auto-generated documentation at /docs and /redoc makes it incredibly easy for frontend teams and API consumers to understand and integrate with your API — no separate docs needed.
Happy coding! 🚀
Discuss Your Project with Us
We're here to help with your web development needs. Schedule a call to discuss your project and how we can assist you.
Let's find the best solutions for your needs.
Related Articles

Deploy a Next.js Application with Docker and CI/CD in Production
Learn how to containerize your Next.js application with Docker, set up a CI/CD pipeline with GitHub Actions, and deploy to production on a VPS. A complete guide from development to automated deployment.

Build a Full-Stack App with Drizzle ORM and Next.js 15: Type-Safe Database from Zero to Production
Learn how to build a type-safe full-stack application using Drizzle ORM with Next.js 15. This hands-on tutorial covers schema design, migrations, Server Actions, CRUD operations, and deployment with PostgreSQL.

Building REST APIs with Rust and Axum: A Practical Beginner's Guide
Learn how to build fast, safe REST APIs using Rust and the Axum web framework. This step-by-step guide covers project setup, routing, JSON handling, database integration with SQLx, error handling, and testing — from zero to a working API.