Building Production-Ready REST APIs with FastAPI, PostgreSQL, and Docker

AI Bot
By AI Bot ·

Loading the Text to Speech Audio Player...

FastAPI is the fastest-growing Python web framework for a reason. Automatic docs, type safety, async support, and blazing performance. In this tutorial, you'll build a complete REST API from scratch and containerize it for production.

What You'll Build

A fully functional Task Management API with:

  • CRUD operations for projects and tasks
  • User authentication with JWT tokens
  • PostgreSQL database with SQLAlchemy ORM
  • Database migrations with Alembic
  • Docker Compose for local development and production
  • Automated API documentation (Swagger & ReDoc)
  • Input validation and error handling
  • Environment-based configuration

Prerequisites

Before starting, make sure you have:

  • Python 3.11+ installed (python.org)
  • Docker and Docker Compose (docker.com)
  • Basic knowledge of Python and REST APIs
  • A code editor (VS Code recommended)
  • A terminal/command line

All code in this tutorial is available on GitHub. You can follow along step by step or clone the final result.


Step 1: Project Setup and Structure

Create your project directory and set up a clean structure:

mkdir taskflow-api && cd taskflow-api
mkdir -p app/{api,core,models,schemas,services}
touch app/__init__.py app/main.py app/database.py
touch app/api/__init__.py app/core/__init__.py
touch app/models/__init__.py app/schemas/__init__.py
touch app/services/__init__.py
touch requirements.txt .env .env.example

Your project should look like this:

taskflow-api/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── database.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── projects.py
│   │   └── tasks.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py
│   │   └── security.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── models.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   └── schemas.py
│   └── services/
│       ├── __init__.py
│       └── auth_service.py
├── alembic/
├── tests/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── .env
└── alembic.ini

Step 2: Install Dependencies

Add the following to requirements.txt:

fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
asyncpg==0.30.0
alembic==1.14.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic-settings==2.6.0
httpx==0.28.0
pytest==8.3.0
pytest-asyncio==0.24.0

Create a virtual environment and install:

python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install -r requirements.txt

Step 3: Configuration Management

Create app/core/config.py to handle environment variables:

from pydantic_settings import BaseSettings
from functools import lru_cache
 
 
class Settings(BaseSettings):
    # Application
    APP_NAME: str = "TaskFlow API"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
 
    # Database
    DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow"
 
    # Authentication
    SECRET_KEY: str = "your-secret-key-change-in-production"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
 
    # CORS
    ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
 
    class Config:
        env_file = ".env"
        case_sensitive = True
 
 
@lru_cache()
def get_settings() -> Settings:
    return Settings()

Create your .env file:

APP_NAME=TaskFlow API
DEBUG=true
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow
SECRET_KEY=your-super-secret-key-at-least-32-characters-long

Never commit your .env file to version control. Add it to .gitignore immediately. Use .env.example with placeholder values for documentation.

Step 4: Database Setup with SQLAlchemy

Create the async database connection in app/database.py:

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
 
from app.core.config import get_settings
 
settings = get_settings()
 
engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)
 
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
 
 
class Base(DeclarativeBase):
    pass
 
 
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Step 5: Define Your Models

Create app/models/models.py:

import uuid
from datetime import datetime
 
from sqlalchemy import (
    Boolean,
    DateTime,
    ForeignKey,
    String,
    Text,
    Enum as SAEnum,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
 
from app.database import Base
import enum
 
 
class TaskStatus(str, enum.Enum):
    TODO = "todo"
    IN_PROGRESS = "in_progress"
    DONE = "done"
    ARCHIVED = "archived"
 
 
class TaskPriority(str, enum.Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    URGENT = "urgent"
 
 
class User(Base):
    __tablename__ = "users"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    email: Mapped[str] = mapped_column(
        String(255), unique=True, index=True, nullable=False
    )
    username: Mapped[str] = mapped_column(
        String(100), unique=True, index=True, nullable=False
    )
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
 
    # Relationships
    projects: Mapped[list["Project"]] = relationship(back_populates="owner")
 
 
class Project(Base):
    __tablename__ = "projects"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    name: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    owner_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("users.id"), nullable=False
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )
 
    # Relationships
    owner: Mapped["User"] = relationship(back_populates="projects")
    tasks: Mapped[list["Task"]] = relationship(
        back_populates="project", cascade="all, delete-orphan"
    )
 
 
class Task(Base):
    __tablename__ = "tasks"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    title: Mapped[str] = mapped_column(String(300), nullable=False)
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    status: Mapped[TaskStatus] = mapped_column(
        SAEnum(TaskStatus), default=TaskStatus.TODO
    )
    priority: Mapped[TaskPriority] = mapped_column(
        SAEnum(TaskPriority), default=TaskPriority.MEDIUM
    )
    due_date: Mapped[datetime | None] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    project_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )
 
    # Relationships
    project: Mapped["Project"] = relationship(back_populates="tasks")

Step 6: Pydantic Schemas for Validation

Create app/schemas/schemas.py:

import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
 
from app.models.models import TaskStatus, TaskPriority
 
 
# --- Auth Schemas ---
class UserCreate(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=100)
    password: str = Field(..., min_length=8)
 
 
class UserResponse(BaseModel):
    id: uuid.UUID
    email: str
    username: str
    is_active: bool
    created_at: datetime
 
    model_config = {"from_attributes": True}
 
 
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"
 
 
class TokenData(BaseModel):
    user_id: uuid.UUID | None = None
 
 
# --- Project Schemas ---
class ProjectCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=200)
    description: str | None = None
 
 
class ProjectUpdate(BaseModel):
    name: str | None = Field(None, min_length=1, max_length=200)
    description: str | None = None
 
 
class ProjectResponse(BaseModel):
    id: uuid.UUID
    name: str
    description: str | None
    owner_id: uuid.UUID
    created_at: datetime
    updated_at: datetime
 
    model_config = {"from_attributes": True}
 
 
# --- Task Schemas ---
class TaskCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=300)
    description: str | None = None
    status: TaskStatus = TaskStatus.TODO
    priority: TaskPriority = TaskPriority.MEDIUM
    due_date: datetime | None = None
 
 
class TaskUpdate(BaseModel):
    title: str | None = Field(None, min_length=1, max_length=300)
    description: str | None = None
    status: TaskStatus | None = None
    priority: TaskPriority | None = None
    due_date: datetime | None = None
 
 
class TaskResponse(BaseModel):
    id: uuid.UUID
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    project_id: uuid.UUID
    created_at: datetime
    updated_at: datetime
 
    model_config = {"from_attributes": True}

Step 7: Authentication with JWT

Create app/core/security.py:

from datetime import datetime, timedelta, timezone
 
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.core.config import get_settings
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import TokenData
 
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
 
 
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)
 
 
def hash_password(password: str) -> str:
    return pwd_context.hash(password)
 
 
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (
        expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
 
 
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(
            token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
        )
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
        token_data = TokenData(user_id=user_id)
    except JWTError:
        raise credentials_exception
 
    result = await db.execute(
        select(User).where(User.id == token_data.user_id)
    )
    user = result.scalar_one_or_none()
    if user is None:
        raise credentials_exception
    return user

Step 8: API Routes

Authentication Routes — app/api/auth.py

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import UserCreate, UserResponse, Token
from app.core.security import hash_password, verify_password, create_access_token
 
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
 
 
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
    # Check if email already exists
    result = await db.execute(select(User).where(User.email == user_data.email))
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered",
        )
 
    # Check if username already exists
    result = await db.execute(
        select(User).where(User.username == user_data.username)
    )
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already taken",
        )
 
    user = User(
        email=user_data.email,
        username=user_data.username,
        hashed_password=hash_password(user_data.password),
    )
    db.add(user)
    await db.flush()
    await db.refresh(user)
    return user
 
 
@router.post("/login", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    result = await db.execute(
        select(User).where(User.email == form_data.username)
    )
    user = result.scalar_one_or_none()
 
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
        )
 
    access_token = create_access_token(data={"sub": str(user.id)})
    return Token(access_token=access_token)

Project Routes — app/api/projects.py

from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import Project, User
from app.schemas.schemas import ProjectCreate, ProjectUpdate, ProjectResponse
from app.core.security import get_current_user
 
router = APIRouter(prefix="/api/projects", tags=["Projects"])
 
 
@router.get("/", response_model=list[ProjectResponse])
async def list_projects(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(Project.owner_id == current_user.id)
    )
    return result.scalars().all()
 
 
@router.post("/", response_model=ProjectResponse, status_code=201)
async def create_project(
    project_data: ProjectCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    project = Project(**project_data.model_dump(), owner_id=current_user.id)
    db.add(project)
    await db.flush()
    await db.refresh(project)
    return project
 
 
@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
    project_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    return project
 
 
@router.patch("/{project_id}", response_model=ProjectResponse)
async def update_project(
    project_id: UUID,
    project_data: ProjectUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
 
    update_data = project_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(project, field, value)
 
    await db.flush()
    await db.refresh(project)
    return project
 
 
@router.delete("/{project_id}", status_code=204)
async def delete_project(
    project_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
 
    await db.delete(project)

Task Routes — app/api/tasks.py

from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import Task, Project, User, TaskStatus
from app.schemas.schemas import TaskCreate, TaskUpdate, TaskResponse
from app.core.security import get_current_user
 
router = APIRouter(prefix="/api/projects/{project_id}/tasks", tags=["Tasks"])
 
 
@router.get("/", response_model=list[TaskResponse])
async def list_tasks(
    project_id: UUID,
    status: TaskStatus | None = Query(None),
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    # Verify project ownership
    proj = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    if not proj.scalar_one_or_none():
        raise HTTPException(status_code=404, detail="Project not found")
 
    query = select(Task).where(Task.project_id == project_id)
    if status:
        query = query.where(Task.status == status)
 
    result = await db.execute(query.order_by(Task.created_at.desc()))
    return result.scalars().all()
 
 
@router.post("/", response_model=TaskResponse, status_code=201)
async def create_task(
    project_id: UUID,
    task_data: TaskCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    proj = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    if not proj.scalar_one_or_none():
        raise HTTPException(status_code=404, detail="Project not found")
 
    task = Task(**task_data.model_dump(), project_id=project_id)
    db.add(task)
    await db.flush()
    await db.refresh(task)
    return task
 
 
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
    project_id: UUID,
    task_id: UUID,
    task_data: TaskUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Task)
        .join(Project)
        .where(
            Task.id == task_id,
            Task.project_id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    task = result.scalar_one_or_none()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
 
    for field, value in task_data.model_dump(exclude_unset=True).items():
        setattr(task, field, value)
 
    await db.flush()
    await db.refresh(task)
    return task
 
 
@router.delete("/{task_id}", status_code=204)
async def delete_task(
    project_id: UUID,
    task_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Task)
        .join(Project)
        .where(
            Task.id == task_id,
            Task.project_id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    task = result.scalar_one_or_none()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
 
    await db.delete(task)

Step 9: Main Application Entry Point

Create app/main.py:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
from app.core.config import get_settings
from app.api import auth, projects, tasks
 
settings = get_settings()
 
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print(f"🚀 {settings.APP_NAME} v{settings.APP_VERSION} starting...")
    yield
    # Shutdown
    print("👋 Shutting down...")
 
 
app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)
 
# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Include routers
app.include_router(auth.router)
app.include_router(projects.router)
app.include_router(tasks.router)
 
 
@app.get("/health")
async def health_check():
    return {"status": "healthy", "version": settings.APP_VERSION}

Step 10: Database Migrations with Alembic

Initialize Alembic and configure it for async:

alembic init alembic

Update alembic/env.py to use async:

import asyncio
from logging.config import fileConfig
 
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
 
from app.database import Base
from app.models.models import User, Project, Task  # Import all models
from app.core.config import get_settings
 
config = context.config
settings = get_settings()
 
# Set the database URL dynamically
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
 
if config.config_file_name is not None:
    fileConfig(config.config_file_name)
 
target_metadata = Base.metadata
 
 
def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()
 
 
def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()
 
 
async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()
 
 
def run_migrations_online() -> None:
    asyncio.run(run_async_migrations())
 
 
if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Create and run your first migration:

alembic revision --autogenerate -m "initial tables"
alembic upgrade head

Step 11: Dockerize Everything

Create a Dockerfile:

FROM python:3.12-slim AS base
 
WORKDIR /app
 
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc libpq-dev && \
    rm -rf /var/lib/apt/lists/*
 
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
# Copy application code
COPY . .
 
# Production stage
FROM base AS production
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
 
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Create docker-compose.yml:

services:
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: taskflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5
 
  api:
    build:
      context: .
      target: production
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
      SECRET_KEY: ${SECRET_KEY:-change-me-in-production-please}
      DEBUG: "false"
    depends_on:
      db:
        condition: service_healthy
    restart: unless-stopped
 
  migrate:
    build:
      context: .
      target: base
    command: alembic upgrade head
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
    depends_on:
      db:
        condition: service_healthy
 
volumes:
  postgres_data:

Launch everything:

docker compose up -d

Wait a moment, then verify:

# Check containers are running
docker compose ps
 
# Check API health
curl http://localhost:8000/health

You should see:

{"status": "healthy", "version": "1.0.0"}

Visit http://localhost:8000/docs to see the auto-generated Swagger UI. You can test all endpoints directly from the browser!

Step 12: Testing Your API

Create tests/test_api.py:

import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
 
 
@pytest.fixture
def anyio_backend():
    return "asyncio"
 
 
@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
 
 
@pytest.mark.anyio
async def test_health_check(client: AsyncClient):
    response = await client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"
 
 
@pytest.mark.anyio
async def test_register_user(client: AsyncClient):
    response = await client.post(
        "/api/auth/register",
        json={
            "email": "test@example.com",
            "username": "testuser",
            "password": "securepassword123",
        },
    )
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert data["username"] == "testuser"
    assert "hashed_password" not in data
 
 
@pytest.mark.anyio
async def test_login_and_create_project(client: AsyncClient):
    # Register
    await client.post(
        "/api/auth/register",
        json={
            "email": "dev@example.com",
            "username": "developer",
            "password": "securepassword123",
        },
    )
 
    # Login
    login_response = await client.post(
        "/api/auth/login",
        data={"username": "dev@example.com", "password": "securepassword123"},
    )
    token = login_response.json()["access_token"]
    headers = {"Authorization": f"Bearer {token}"}
 
    # Create project
    response = await client.post(
        "/api/projects/",
        json={"name": "My Project", "description": "A test project"},
        headers=headers,
    )
    assert response.status_code == 201
    assert response.json()["name"] == "My Project"

Run the tests:

pytest tests/ -v

Step 13: Production Hardening Tips

Before deploying to production, consider these improvements:

1. Rate Limiting

from slowapi import Limiter
from slowapi.util import get_remote_address
 
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
 
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
    ...

2. Structured Logging

import structlog
 
logger = structlog.get_logger()
 
@app.middleware("http")
async def log_requests(request, call_next):
    logger.info("request_started", method=request.method, path=request.url.path)
    response = await call_next(request)
    logger.info("request_completed", status_code=response.status_code)
    return response

3. Health Check with Database

@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
        return {"status": "healthy", "database": "connected"}
    except Exception:
        return JSONResponse(
            status_code=503,
            content={"status": "unhealthy", "database": "disconnected"},
        )

4. Environment-Specific Docker Compose

Create docker-compose.prod.yml to override for production with proper secrets management, resource limits, and monitoring.


Summary

In this tutorial, you've built a complete production-ready REST API with:

  • FastAPI — async Python web framework with automatic OpenAPI docs
  • SQLAlchemy 2.0 — modern async ORM with mapped columns
  • PostgreSQL — robust relational database
  • Alembic — version-controlled database migrations
  • JWT Authentication — secure token-based auth
  • Docker Compose — containerized development and deployment
  • Pydantic v2 — fast data validation and serialization
  • Testing — async tests with httpx and pytest

Next Steps

  • Add pagination to list endpoints
  • Implement WebSocket for real-time task updates
  • Set up CI/CD with GitHub Actions
  • Add Redis caching for frequent queries
  • Deploy to a cloud provider (AWS ECS, GCP Cloud Run, or a VPS with Docker)

FastAPI's auto-generated documentation at /docs and /redoc makes it incredibly easy for frontend teams and API consumers to understand and integrate with your API — no separate docs needed.

Happy coding! 🚀


Want to read more tutorials? Check out our latest tutorial on Building an Autonomous AI Agent with Agentic RAG and Next.js.

Discuss Your Project with Us

We're here to help with your web development needs. Schedule a call to discuss your project and how we can assist you.

Let's find the best solutions for your needs.

Related Articles

Building REST APIs with Rust and Axum: A Practical Beginner's Guide

Learn how to build fast, safe REST APIs using Rust and the Axum web framework. This step-by-step guide covers project setup, routing, JSON handling, database integration with SQLx, error handling, and testing — from zero to a working API.

30 min read·