Construire des API REST prêtes pour la production avec FastAPI, PostgreSQL et Docker

AI Bot
Par AI Bot ·

Chargement du lecteur de synthèse vocale...

FastAPI est le framework Python qui connaît la plus forte croissance — et ce n'est pas un hasard. Documentation automatique, typage fort, support asynchrone et performances exceptionnelles. Dans ce tutoriel, vous allez construire une API REST complète de A à Z et la conteneuriser pour la production.

Ce que vous allez construire

Un système complet de gestion de tâches (TaskFlow) comprenant :

  • Opérations CRUD complètes pour les projets et les tâches
  • Authentification utilisateur avec des tokens JWT
  • Base de données PostgreSQL avec l'ORM SQLAlchemy
  • Migrations de base de données avec Alembic
  • Docker Compose pour le développement et la production
  • Documentation API automatique (Swagger & ReDoc)
  • Validation des entrées et gestion des erreurs
  • Configuration basée sur l'environnement

Prérequis

Avant de commencer, assurez-vous d'avoir :

  • Python 3.11+ installé (python.org)
  • Docker et Docker Compose (docker.com)
  • Des connaissances de base en Python et en API REST
  • Un éditeur de code (VS Code recommandé)
  • Un terminal

Tout le code de ce tutoriel est disponible sur GitHub. Vous pouvez suivre pas à pas ou cloner directement le résultat final.


Étape 1 : Mise en place et structure du projet

Créez le répertoire du projet avec une structure propre :

mkdir taskflow-api && cd taskflow-api
mkdir -p app/{api,core,models,schemas,services}
touch app/__init__.py app/main.py app/database.py
touch app/api/__init__.py app/core/__init__.py
touch app/models/__init__.py app/schemas/__init__.py
touch app/services/__init__.py
touch requirements.txt .env .env.example

Votre projet devrait ressembler à ceci :

taskflow-api/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── database.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── projects.py
│   │   └── tasks.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py
│   │   └── security.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── models.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   └── schemas.py
│   └── services/
│       ├── __init__.py
│       └── auth_service.py
├── alembic/
├── tests/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── .env
└── alembic.ini

Cette structure sépare clairement les responsabilités : les routes API, la logique métier, les modèles de données et les schémas de validation. C'est un pattern éprouvé qui facilite la maintenance à mesure que le projet grandit.

Étape 2 : Installation des dépendances

Ajoutez les dépendances suivantes dans requirements.txt :

fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
asyncpg==0.30.0
alembic==1.14.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic-settings==2.6.0
httpx==0.28.0
pytest==8.3.0
pytest-asyncio==0.24.0

Créez un environnement virtuel et installez :

python -m venv venv
source venv/bin/activate  # Sous Windows : venv\Scripts\activate
pip install -r requirements.txt

Étape 3 : Gestion de la configuration

Créez app/core/config.py pour gérer les variables d'environnement :

from pydantic_settings import BaseSettings
from functools import lru_cache
 
 
class Settings(BaseSettings):
    # Application
    APP_NAME: str = "TaskFlow API"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
 
    # Base de données
    DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow"
 
    # Authentification
    SECRET_KEY: str = "your-secret-key-change-in-production"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
 
    # CORS
    ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
 
    class Config:
        env_file = ".env"
        case_sensitive = True
 
 
@lru_cache()
def get_settings() -> Settings:
    return Settings()

L'utilisation de @lru_cache() est importante ici : elle garantit que la configuration n'est lue qu'une seule fois et mise en cache. Sans cela, chaque appel à get_settings() relirait le fichier .env.

Créez votre fichier .env :

APP_NAME=TaskFlow API
DEBUG=true
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow
SECRET_KEY=votre-cle-secrete-dau-moins-32-caracteres-de-long

Ne commitez jamais votre fichier .env dans le contrôle de version. Ajoutez-le immédiatement au .gitignore. Utilisez .env.example avec des valeurs fictives pour la documentation.

Étape 4 : Configuration de la base de données avec SQLAlchemy

Créez la connexion asynchrone dans app/database.py :

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
 
from app.core.config import get_settings
 
settings = get_settings()
 
engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)
 
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)
 
 
class Base(DeclarativeBase):
    pass
 
 
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Le paramètre pool_pre_ping=True vérifie que la connexion est toujours valide avant chaque requête. C'est essentiel en production pour éviter les erreurs liées aux connexions interrompues.

Étape 5 : Définition des modèles

Créez app/models/models.py :

import uuid
from datetime import datetime
 
from sqlalchemy import (
    Boolean,
    DateTime,
    ForeignKey,
    String,
    Text,
    Enum as SAEnum,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
 
from app.database import Base
import enum
 
 
class TaskStatus(str, enum.Enum):
    TODO = "todo"
    IN_PROGRESS = "in_progress"
    DONE = "done"
    ARCHIVED = "archived"
 
 
class TaskPriority(str, enum.Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    URGENT = "urgent"
 
 
class User(Base):
    __tablename__ = "users"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    email: Mapped[str] = mapped_column(
        String(255), unique=True, index=True, nullable=False
    )
    username: Mapped[str] = mapped_column(
        String(100), unique=True, index=True, nullable=False
    )
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
 
    # Relations
    projects: Mapped[list["Project"]] = relationship(back_populates="owner")
 
 
class Project(Base):
    __tablename__ = "projects"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    name: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    owner_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("users.id"), nullable=False
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )
 
    # Relations
    owner: Mapped["User"] = relationship(back_populates="projects")
    tasks: Mapped[list["Task"]] = relationship(
        back_populates="project", cascade="all, delete-orphan"
    )
 
 
class Task(Base):
    __tablename__ = "tasks"
 
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    title: Mapped[str] = mapped_column(String(300), nullable=False)
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    status: Mapped[TaskStatus] = mapped_column(
        SAEnum(TaskStatus), default=TaskStatus.TODO
    )
    priority: Mapped[TaskPriority] = mapped_column(
        SAEnum(TaskPriority), default=TaskPriority.MEDIUM
    )
    due_date: Mapped[datetime | None] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    project_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )
 
    # Relations
    project: Mapped["Project"] = relationship(back_populates="tasks")

Nous utilisons la syntaxe Mapped avec mapped_column, qui est l'approche moderne de SQLAlchemy 2.0. Elle offre un support complet du typage, ce qui signifie que votre IDE vous donnera des suggestions et des avertissements précis.

Étape 6 : Schémas Pydantic pour la validation

Créez app/schemas/schemas.py :

import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
 
from app.models.models import TaskStatus, TaskPriority
 
 
# --- Schémas d'authentification ---
class UserCreate(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=100)
    password: str = Field(..., min_length=8)
 
 
class UserResponse(BaseModel):
    id: uuid.UUID
    email: str
    username: str
    is_active: bool
    created_at: datetime
 
    model_config = {"from_attributes": True}
 
 
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"
 
 
class TokenData(BaseModel):
    user_id: uuid.UUID | None = None
 
 
# --- Schémas de projets ---
class ProjectCreate(BaseModel):
    name: str = Field(..., min_length=1, max_length=200)
    description: str | None = None
 
 
class ProjectUpdate(BaseModel):
    name: str | None = Field(None, min_length=1, max_length=200)
    description: str | None = None
 
 
class ProjectResponse(BaseModel):
    id: uuid.UUID
    name: str
    description: str | None
    owner_id: uuid.UUID
    created_at: datetime
    updated_at: datetime
 
    model_config = {"from_attributes": True}
 
 
# --- Schémas de tâches ---
class TaskCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=300)
    description: str | None = None
    status: TaskStatus = TaskStatus.TODO
    priority: TaskPriority = TaskPriority.MEDIUM
    due_date: datetime | None = None
 
 
class TaskUpdate(BaseModel):
    title: str | None = Field(None, min_length=1, max_length=300)
    description: str | None = None
    status: TaskStatus | None = None
    priority: TaskPriority | None = None
    due_date: datetime | None = None
 
 
class TaskResponse(BaseModel):
    id: uuid.UUID
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    project_id: uuid.UUID
    created_at: datetime
    updated_at: datetime
 
    model_config = {"from_attributes": True}

La séparation entre les schémas d'entrée (Create/Update) et de sortie (Response) est fondamentale. Vous ne voulez jamais exposer le hashed_password dans une réponse API !

Étape 7 : Authentification JWT

Créez app/core/security.py :

from datetime import datetime, timedelta, timezone
 
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.core.config import get_settings
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import TokenData
 
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
 
 
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)
 
 
def hash_password(password: str) -> str:
    return pwd_context.hash(password)
 
 
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (
        expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
 
 
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Impossible de valider les identifiants",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(
            token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
        )
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
        token_data = TokenData(user_id=user_id)
    except JWTError:
        raise credentials_exception
 
    result = await db.execute(
        select(User).where(User.id == token_data.user_id)
    )
    user = result.scalar_one_or_none()
    if user is None:
        raise credentials_exception
    return user

L'élégance du système Depends de FastAPI, c'est qu'il construit automatiquement la chaîne d'injection de dépendances. Quand vous passez get_current_user comme paramètre d'un endpoint, FastAPI vérifie le token et récupère l'utilisateur avant d'exécuter la fonction.

Étape 8 : Routes de l'API

Routes d'authentification — app/api/auth.py

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import UserCreate, UserResponse, Token
from app.core.security import hash_password, verify_password, create_access_token
 
router = APIRouter(prefix="/api/auth", tags=["Authentification"])
 
 
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
    # Vérifier si l'email existe déjà
    result = await db.execute(select(User).where(User.email == user_data.email))
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Cet email est déjà enregistré",
        )
 
    # Vérifier si le nom d'utilisateur est pris
    result = await db.execute(
        select(User).where(User.username == user_data.username)
    )
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Ce nom d'utilisateur est déjà pris",
        )
 
    user = User(
        email=user_data.email,
        username=user_data.username,
        hashed_password=hash_password(user_data.password),
    )
    db.add(user)
    await db.flush()
    await db.refresh(user)
    return user
 
 
@router.post("/login", response_model=Token)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    result = await db.execute(
        select(User).where(User.email == form_data.username)
    )
    user = result.scalar_one_or_none()
 
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Email ou mot de passe incorrect",
        )
 
    access_token = create_access_token(data={"sub": str(user.id)})
    return Token(access_token=access_token)

Routes des projets — app/api/projects.py

from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import Project, User
from app.schemas.schemas import ProjectCreate, ProjectUpdate, ProjectResponse
from app.core.security import get_current_user
 
router = APIRouter(prefix="/api/projects", tags=["Projets"])
 
 
@router.get("/", response_model=list[ProjectResponse])
async def list_projects(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(Project.owner_id == current_user.id)
    )
    return result.scalars().all()
 
 
@router.post("/", response_model=ProjectResponse, status_code=201)
async def create_project(
    project_data: ProjectCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    project = Project(**project_data.model_dump(), owner_id=current_user.id)
    db.add(project)
    await db.flush()
    await db.refresh(project)
    return project
 
 
@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
    project_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Projet non trouvé")
    return project
 
 
@router.patch("/{project_id}", response_model=ProjectResponse)
async def update_project(
    project_id: UUID,
    project_data: ProjectUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Projet non trouvé")
 
    update_data = project_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(project, field, value)
 
    await db.flush()
    await db.refresh(project)
    return project
 
 
@router.delete("/{project_id}", status_code=204)
async def delete_project(
    project_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    project = result.scalar_one_or_none()
    if not project:
        raise HTTPException(status_code=404, detail="Projet non trouvé")
 
    await db.delete(project)

Routes des tâches — app/api/tasks.py

from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
 
from app.database import get_db
from app.models.models import Task, Project, User, TaskStatus
from app.schemas.schemas import TaskCreate, TaskUpdate, TaskResponse
from app.core.security import get_current_user
 
router = APIRouter(prefix="/api/projects/{project_id}/tasks", tags=["Tâches"])
 
 
@router.get("/", response_model=list[TaskResponse])
async def list_tasks(
    project_id: UUID,
    status: TaskStatus | None = Query(None),
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    proj = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    if not proj.scalar_one_or_none():
        raise HTTPException(status_code=404, detail="Projet non trouvé")
 
    query = select(Task).where(Task.project_id == project_id)
    if status:
        query = query.where(Task.status == status)
 
    result = await db.execute(query.order_by(Task.created_at.desc()))
    return result.scalars().all()
 
 
@router.post("/", response_model=TaskResponse, status_code=201)
async def create_task(
    project_id: UUID,
    task_data: TaskCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    proj = await db.execute(
        select(Project).where(
            Project.id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    if not proj.scalar_one_or_none():
        raise HTTPException(status_code=404, detail="Projet non trouvé")
 
    task = Task(**task_data.model_dump(), project_id=project_id)
    db.add(task)
    await db.flush()
    await db.refresh(task)
    return task
 
 
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
    project_id: UUID,
    task_id: UUID,
    task_data: TaskUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Task)
        .join(Project)
        .where(
            Task.id == task_id,
            Task.project_id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    task = result.scalar_one_or_none()
    if not task:
        raise HTTPException(status_code=404, detail="Tâche non trouvée")
 
    for field, value in task_data.model_dump(exclude_unset=True).items():
        setattr(task, field, value)
 
    await db.flush()
    await db.refresh(task)
    return task
 
 
@router.delete("/{task_id}", status_code=204)
async def delete_task(
    project_id: UUID,
    task_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    result = await db.execute(
        select(Task)
        .join(Project)
        .where(
            Task.id == task_id,
            Task.project_id == project_id,
            Project.owner_id == current_user.id,
        )
    )
    task = result.scalar_one_or_none()
    if not task:
        raise HTTPException(status_code=404, detail="Tâche non trouvée")
 
    await db.delete(task)

Étape 9 : Point d'entrée principal

Créez app/main.py :

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
from app.core.config import get_settings
from app.api import auth, projects, tasks
 
settings = get_settings()
 
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Démarrage
    print(f"🚀 {settings.APP_NAME} v{settings.APP_VERSION} en cours de démarrage...")
    yield
    # Arrêt
    print("👋 Arrêt en cours...")
 
 
app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)
 
# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
# Enregistrement des routes
app.include_router(auth.router)
app.include_router(projects.router)
app.include_router(tasks.router)
 
 
@app.get("/health")
async def health_check():
    return {"status": "healthy", "version": settings.APP_VERSION}

Étape 10 : Migrations avec Alembic

Initialisez Alembic et configurez-le pour le mode asynchrone :

alembic init alembic

Modifiez alembic/env.py pour supporter l'async :

import asyncio
from logging.config import fileConfig
 
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
 
from app.database import Base
from app.models.models import User, Project, Task  # Importer tous les modèles
from app.core.config import get_settings
 
config = context.config
settings = get_settings()
 
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
 
if config.config_file_name is not None:
    fileConfig(config.config_file_name)
 
target_metadata = Base.metadata
 
 
def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()
 
 
def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()
 
 
async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()
 
 
def run_migrations_online() -> None:
    asyncio.run(run_async_migrations())
 
 
if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Créez et appliquez votre première migration :

alembic revision --autogenerate -m "tables initiales"
alembic upgrade head

Étape 11 : Conteneurisation avec Docker

Créez le Dockerfile :

FROM python:3.12-slim AS base
 
WORKDIR /app
 
# Dépendances système
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc libpq-dev && \
    rm -rf /var/lib/apt/lists/*
 
# Dépendances Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
# Code applicatif
COPY . .
 
# Étape de production
FROM base AS production
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
 
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Créez le docker-compose.yml :

services:
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: taskflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5
 
  api:
    build:
      context: .
      target: production
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
      SECRET_KEY: ${SECRET_KEY:-changez-moi-en-production-svp}
      DEBUG: "false"
    depends_on:
      db:
        condition: service_healthy
    restart: unless-stopped
 
  migrate:
    build:
      context: .
      target: base
    command: alembic upgrade head
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
    depends_on:
      db:
        condition: service_healthy
 
volumes:
  postgres_data:

Lancez le tout :

docker compose up -d

Attendez un instant, puis vérifiez :

# Vérifier que les conteneurs tournent
docker compose ps
 
# Vérifier la santé de l'API
curl http://localhost:8000/health

Vous devriez obtenir :

{"status": "healthy", "version": "1.0.0"}

Rendez-vous sur http://localhost:8000/docs pour voir l'interface Swagger générée automatiquement. Vous pouvez tester tous les endpoints directement depuis votre navigateur !

Étape 12 : Tester votre API

Créez tests/test_api.py :

import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
 
 
@pytest.fixture
def anyio_backend():
    return "asyncio"
 
 
@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
 
 
@pytest.mark.anyio
async def test_health_check(client: AsyncClient):
    response = await client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"
 
 
@pytest.mark.anyio
async def test_inscription_utilisateur(client: AsyncClient):
    response = await client.post(
        "/api/auth/register",
        json={
            "email": "test@example.com",
            "username": "testuser",
            "password": "motdepasse123securise",
        },
    )
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert data["username"] == "testuser"
    assert "hashed_password" not in data
 
 
@pytest.mark.anyio
async def test_connexion_et_creation_projet(client: AsyncClient):
    # Inscription
    await client.post(
        "/api/auth/register",
        json={
            "email": "dev@example.com",
            "username": "developpeur",
            "password": "motdepasse123securise",
        },
    )
 
    # Connexion
    login_response = await client.post(
        "/api/auth/login",
        data={"username": "dev@example.com", "password": "motdepasse123securise"},
    )
    token = login_response.json()["access_token"]
    headers = {"Authorization": f"Bearer {token}"}
 
    # Création de projet
    response = await client.post(
        "/api/projects/",
        json={"name": "Mon Projet", "description": "Un projet de test"},
        headers=headers,
    )
    assert response.status_code == 201
    assert response.json()["name"] == "Mon Projet"

Lancez les tests :

pytest tests/ -v

Étape 13 : Conseils pour la mise en production

Avant de déployer en production, ajoutez ces améliorations :

1. Limitation du débit (Rate Limiting)

from slowapi import Limiter
from slowapi.util import get_remote_address
 
limiter = Limiter(key_func=get_remote_address)
 
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
    ...

2. Journalisation structurée

import structlog
 
logger = structlog.get_logger()
 
@app.middleware("http")
async def log_requests(request, call_next):
    logger.info("requete_demarree", method=request.method, path=request.url.path)
    response = await call_next(request)
    logger.info("requete_terminee", status_code=response.status_code)
    return response

3. Vérification de santé avec la base de données

@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
        return {"status": "healthy", "database": "connected"}
    except Exception:
        return JSONResponse(
            status_code=503,
            content={"status": "unhealthy", "database": "disconnected"},
        )

4. Docker Compose spécifique à l'environnement

Créez un docker-compose.prod.yml pour surcharger les paramètres de production avec une gestion appropriée des secrets, des limites de ressources et du monitoring.


Résumé

Dans ce tutoriel, vous avez construit une API REST complète et prête pour la production avec :

  • FastAPI — framework web Python asynchrone avec documentation OpenAPI automatique
  • SQLAlchemy 2.0 — ORM moderne avec support complet du typage
  • PostgreSQL — base de données relationnelle robuste
  • Alembic — migrations versionnées de la base de données
  • JWT — authentification sécurisée par tokens
  • Docker Compose — conteneurs pour le développement et le déploiement
  • Pydantic v2 — validation rapide des données
  • Tests — tests asynchrones avec httpx et pytest

Prochaines étapes

  • Ajouter la pagination aux endpoints de liste
  • Implémenter des WebSockets pour les mises à jour en temps réel
  • Mettre en place un pipeline CI/CD avec GitHub Actions
  • Ajouter du cache Redis pour les requêtes fréquentes
  • Déployer sur un service cloud (AWS ECS, GCP Cloud Run ou un VPS avec Docker)

La documentation générée automatiquement par FastAPI sur /docs et /redoc facilite énormément la collaboration avec les équipes frontend et les consommateurs de l'API — pas besoin de rédiger une documentation séparée.

Bon code ! 🚀


Discutez de votre projet avec nous

Nous sommes ici pour vous aider avec vos besoins en développement Web. Planifiez un appel pour discuter de votre projet et comment nous pouvons vous aider.

Trouvons les meilleures solutions pour vos besoins.

Articles connexes

Construire des API REST avec Rust et Axum : guide pratique pour débutants

Apprenez à construire des API REST rapides et sûres avec Rust et le framework web Axum. Ce guide étape par étape couvre la configuration du projet, le routage, la gestion JSON, l'intégration base de données avec SQLx, la gestion des erreurs et les tests — de zéro à une API fonctionnelle.

30 min read·