Construire des API REST prêtes pour la production avec FastAPI, PostgreSQL et Docker

FastAPI est le framework Python qui connaît la plus forte croissance — et ce n'est pas un hasard. Documentation automatique, typage fort, support asynchrone et performances exceptionnelles. Dans ce tutoriel, vous allez construire une API REST complète de A à Z et la conteneuriser pour la production.
Ce que vous allez construire
Un système complet de gestion de tâches (TaskFlow) comprenant :
- Opérations CRUD complètes pour les projets et les tâches
- Authentification utilisateur avec des tokens JWT
- Base de données PostgreSQL avec l'ORM SQLAlchemy
- Migrations de base de données avec Alembic
- Docker Compose pour le développement et la production
- Documentation API automatique (Swagger & ReDoc)
- Validation des entrées et gestion des erreurs
- Configuration basée sur l'environnement
Prérequis
Avant de commencer, assurez-vous d'avoir :
- Python 3.11+ installé (python.org)
- Docker et Docker Compose (docker.com)
- Des connaissances de base en Python et en API REST
- Un éditeur de code (VS Code recommandé)
- Un terminal
Tout le code de ce tutoriel est disponible sur GitHub. Vous pouvez suivre pas à pas ou cloner directement le résultat final.
Étape 1 : Mise en place et structure du projet
Créez le répertoire du projet avec une structure propre :
mkdir taskflow-api && cd taskflow-api
mkdir -p app/{api,core,models,schemas,services}
touch app/__init__.py app/main.py app/database.py
touch app/api/__init__.py app/core/__init__.py
touch app/models/__init__.py app/schemas/__init__.py
touch app/services/__init__.py
touch requirements.txt .env .env.exampleVotre projet devrait ressembler à ceci :
taskflow-api/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── projects.py
│ │ └── tasks.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py
│ │ └── security.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── models.py
│ ├── schemas/
│ │ ├── __init__.py
│ │ └── schemas.py
│ └── services/
│ ├── __init__.py
│ └── auth_service.py
├── alembic/
├── tests/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── .env
└── alembic.ini
Cette structure sépare clairement les responsabilités : les routes API, la logique métier, les modèles de données et les schémas de validation. C'est un pattern éprouvé qui facilite la maintenance à mesure que le projet grandit.
Étape 2 : Installation des dépendances
Ajoutez les dépendances suivantes dans requirements.txt :
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
asyncpg==0.30.0
alembic==1.14.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.12
pydantic-settings==2.6.0
httpx==0.28.0
pytest==8.3.0
pytest-asyncio==0.24.0Créez un environnement virtuel et installez :
python -m venv venv
source venv/bin/activate # Sous Windows : venv\Scripts\activate
pip install -r requirements.txtÉtape 3 : Gestion de la configuration
Créez app/core/config.py pour gérer les variables d'environnement :
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Application
APP_NAME: str = "TaskFlow API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# Base de données
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow"
# Authentification
SECRET_KEY: str = "your-secret-key-change-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache()
def get_settings() -> Settings:
return Settings()L'utilisation de @lru_cache() est importante ici : elle garantit que la configuration n'est lue qu'une seule fois et mise en cache. Sans cela, chaque appel à get_settings() relirait le fichier .env.
Créez votre fichier .env :
APP_NAME=TaskFlow API
DEBUG=true
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/taskflow
SECRET_KEY=votre-cle-secrete-dau-moins-32-caracteres-de-longNe commitez jamais votre fichier .env dans le contrôle de version. Ajoutez-le immédiatement au .gitignore. Utilisez .env.example avec des valeurs fictives pour la documentation.
Étape 4 : Configuration de la base de données avec SQLAlchemy
Créez la connexion asynchrone dans app/database.py :
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from app.core.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()Le paramètre pool_pre_ping=True vérifie que la connexion est toujours valide avant chaque requête. C'est essentiel en production pour éviter les erreurs liées aux connexions interrompues.
Étape 5 : Définition des modèles
Créez app/models/models.py :
import uuid
from datetime import datetime
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
String,
Text,
Enum as SAEnum,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from app.database import Base
import enum
class TaskStatus(str, enum.Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
DONE = "done"
ARCHIVED = "archived"
class TaskPriority(str, enum.Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
email: Mapped[str] = mapped_column(
String(255), unique=True, index=True, nullable=False
)
username: Mapped[str] = mapped_column(
String(100), unique=True, index=True, nullable=False
)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# Relations
projects: Mapped[list["Project"]] = relationship(back_populates="owner")
class Project(Base):
__tablename__ = "projects"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
name: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
owner_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
# Relations
owner: Mapped["User"] = relationship(back_populates="projects")
tasks: Mapped[list["Task"]] = relationship(
back_populates="project", cascade="all, delete-orphan"
)
class Task(Base):
__tablename__ = "tasks"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
title: Mapped[str] = mapped_column(String(300), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[TaskStatus] = mapped_column(
SAEnum(TaskStatus), default=TaskStatus.TODO
)
priority: Mapped[TaskPriority] = mapped_column(
SAEnum(TaskPriority), default=TaskPriority.MEDIUM
)
due_date: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
project_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("projects.id"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
# Relations
project: Mapped["Project"] = relationship(back_populates="tasks")Nous utilisons la syntaxe Mapped avec mapped_column, qui est l'approche moderne de SQLAlchemy 2.0. Elle offre un support complet du typage, ce qui signifie que votre IDE vous donnera des suggestions et des avertissements précis.
Étape 6 : Schémas Pydantic pour la validation
Créez app/schemas/schemas.py :
import uuid
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field
from app.models.models import TaskStatus, TaskPriority
# --- Schémas d'authentification ---
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=100)
password: str = Field(..., min_length=8)
class UserResponse(BaseModel):
id: uuid.UUID
email: str
username: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
user_id: uuid.UUID | None = None
# --- Schémas de projets ---
class ProjectCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: str | None = None
class ProjectUpdate(BaseModel):
name: str | None = Field(None, min_length=1, max_length=200)
description: str | None = None
class ProjectResponse(BaseModel):
id: uuid.UUID
name: str
description: str | None
owner_id: uuid.UUID
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
# --- Schémas de tâches ---
class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=300)
description: str | None = None
status: TaskStatus = TaskStatus.TODO
priority: TaskPriority = TaskPriority.MEDIUM
due_date: datetime | None = None
class TaskUpdate(BaseModel):
title: str | None = Field(None, min_length=1, max_length=300)
description: str | None = None
status: TaskStatus | None = None
priority: TaskPriority | None = None
due_date: datetime | None = None
class TaskResponse(BaseModel):
id: uuid.UUID
title: str
description: str | None
status: TaskStatus
priority: TaskPriority
due_date: datetime | None
project_id: uuid.UUID
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}La séparation entre les schémas d'entrée (Create/Update) et de sortie (Response) est fondamentale. Vous ne voulez jamais exposer le hashed_password dans une réponse API !
Étape 7 : Authentification JWT
Créez app/core/security.py :
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import TokenData
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Impossible de valider les identifiants",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=user_id)
except JWTError:
raise credentials_exception
result = await db.execute(
select(User).where(User.id == token_data.user_id)
)
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return userL'élégance du système Depends de FastAPI, c'est qu'il construit automatiquement la chaîne d'injection de dépendances. Quand vous passez get_current_user comme paramètre d'un endpoint, FastAPI vérifie le token et récupère l'utilisateur avant d'exécuter la fonction.
Étape 8 : Routes de l'API
Routes d'authentification — app/api/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import User
from app.schemas.schemas import UserCreate, UserResponse, Token
from app.core.security import hash_password, verify_password, create_access_token
router = APIRouter(prefix="/api/auth", tags=["Authentification"])
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
# Vérifier si l'email existe déjà
result = await db.execute(select(User).where(User.email == user_data.email))
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cet email est déjà enregistré",
)
# Vérifier si le nom d'utilisateur est pris
result = await db.execute(
select(User).where(User.username == user_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ce nom d'utilisateur est déjà pris",
)
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hash_password(user_data.password),
)
db.add(user)
await db.flush()
await db.refresh(user)
return user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
select(User).where(User.email == form_data.username)
)
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect",
)
access_token = create_access_token(data={"sub": str(user.id)})
return Token(access_token=access_token)Routes des projets — app/api/projects.py
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import Project, User
from app.schemas.schemas import ProjectCreate, ProjectUpdate, ProjectResponse
from app.core.security import get_current_user
router = APIRouter(prefix="/api/projects", tags=["Projets"])
@router.get("/", response_model=list[ProjectResponse])
async def list_projects(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(Project.owner_id == current_user.id)
)
return result.scalars().all()
@router.post("/", response_model=ProjectResponse, status_code=201)
async def create_project(
project_data: ProjectCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
project = Project(**project_data.model_dump(), owner_id=current_user.id)
db.add(project)
await db.flush()
await db.refresh(project)
return project
@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
project_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Projet non trouvé")
return project
@router.patch("/{project_id}", response_model=ProjectResponse)
async def update_project(
project_id: UUID,
project_data: ProjectUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Projet non trouvé")
update_data = project_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(project, field, value)
await db.flush()
await db.refresh(project)
return project
@router.delete("/{project_id}", status_code=204)
async def delete_project(
project_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="Projet non trouvé")
await db.delete(project)Routes des tâches — app/api/tasks.py
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.models import Task, Project, User, TaskStatus
from app.schemas.schemas import TaskCreate, TaskUpdate, TaskResponse
from app.core.security import get_current_user
router = APIRouter(prefix="/api/projects/{project_id}/tasks", tags=["Tâches"])
@router.get("/", response_model=list[TaskResponse])
async def list_tasks(
project_id: UUID,
status: TaskStatus | None = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
proj = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
if not proj.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Projet non trouvé")
query = select(Task).where(Task.project_id == project_id)
if status:
query = query.where(Task.status == status)
result = await db.execute(query.order_by(Task.created_at.desc()))
return result.scalars().all()
@router.post("/", response_model=TaskResponse, status_code=201)
async def create_task(
project_id: UUID,
task_data: TaskCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
proj = await db.execute(
select(Project).where(
Project.id == project_id,
Project.owner_id == current_user.id,
)
)
if not proj.scalar_one_or_none():
raise HTTPException(status_code=404, detail="Projet non trouvé")
task = Task(**task_data.model_dump(), project_id=project_id)
db.add(task)
await db.flush()
await db.refresh(task)
return task
@router.patch("/{task_id}", response_model=TaskResponse)
async def update_task(
project_id: UUID,
task_id: UUID,
task_data: TaskUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Task)
.join(Project)
.where(
Task.id == task_id,
Task.project_id == project_id,
Project.owner_id == current_user.id,
)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Tâche non trouvée")
for field, value in task_data.model_dump(exclude_unset=True).items():
setattr(task, field, value)
await db.flush()
await db.refresh(task)
return task
@router.delete("/{task_id}", status_code=204)
async def delete_task(
project_id: UUID,
task_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Task)
.join(Project)
.where(
Task.id == task_id,
Task.project_id == project_id,
Project.owner_id == current_user.id,
)
)
task = result.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="Tâche non trouvée")
await db.delete(task)Étape 9 : Point d'entrée principal
Créez app/main.py :
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import get_settings
from app.api import auth, projects, tasks
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Démarrage
print(f"🚀 {settings.APP_NAME} v{settings.APP_VERSION} en cours de démarrage...")
yield
# Arrêt
print("👋 Arrêt en cours...")
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enregistrement des routes
app.include_router(auth.router)
app.include_router(projects.router)
app.include_router(tasks.router)
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": settings.APP_VERSION}Étape 10 : Migrations avec Alembic
Initialisez Alembic et configurez-le pour le mode asynchrone :
alembic init alembicModifiez alembic/env.py pour supporter l'async :
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database import Base
from app.models.models import User, Project, Task # Importer tous les modèles
from app.core.config import get_settings
config = context.config
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()Créez et appliquez votre première migration :
alembic revision --autogenerate -m "tables initiales"
alembic upgrade headÉtape 11 : Conteneurisation avec Docker
Créez le Dockerfile :
FROM python:3.12-slim AS base
WORKDIR /app
# Dépendances système
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
# Dépendances Python
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Code applicatif
COPY . .
# Étape de production
FROM base AS production
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]Créez le docker-compose.yml :
services:
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: taskflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
api:
build:
context: .
target: production
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
SECRET_KEY: ${SECRET_KEY:-changez-moi-en-production-svp}
DEBUG: "false"
depends_on:
db:
condition: service_healthy
restart: unless-stopped
migrate:
build:
context: .
target: base
command: alembic upgrade head
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/taskflow
depends_on:
db:
condition: service_healthy
volumes:
postgres_data:Lancez le tout :
docker compose up -dAttendez un instant, puis vérifiez :
# Vérifier que les conteneurs tournent
docker compose ps
# Vérifier la santé de l'API
curl http://localhost:8000/healthVous devriez obtenir :
{"status": "healthy", "version": "1.0.0"}Rendez-vous sur http://localhost:8000/docs pour voir l'interface Swagger générée automatiquement. Vous pouvez tester tous les endpoints directement depuis votre navigateur !
Étape 12 : Tester votre API
Créez tests/test_api.py :
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
def anyio_backend():
return "asyncio"
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.anyio
async def test_health_check(client: AsyncClient):
response = await client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
@pytest.mark.anyio
async def test_inscription_utilisateur(client: AsyncClient):
response = await client.post(
"/api/auth/register",
json={
"email": "test@example.com",
"username": "testuser",
"password": "motdepasse123securise",
},
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
assert "hashed_password" not in data
@pytest.mark.anyio
async def test_connexion_et_creation_projet(client: AsyncClient):
# Inscription
await client.post(
"/api/auth/register",
json={
"email": "dev@example.com",
"username": "developpeur",
"password": "motdepasse123securise",
},
)
# Connexion
login_response = await client.post(
"/api/auth/login",
data={"username": "dev@example.com", "password": "motdepasse123securise"},
)
token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# Création de projet
response = await client.post(
"/api/projects/",
json={"name": "Mon Projet", "description": "Un projet de test"},
headers=headers,
)
assert response.status_code == 201
assert response.json()["name"] == "Mon Projet"Lancez les tests :
pytest tests/ -vÉtape 13 : Conseils pour la mise en production
Avant de déployer en production, ajoutez ces améliorations :
1. Limitation du débit (Rate Limiting)
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
...2. Journalisation structurée
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def log_requests(request, call_next):
logger.info("requete_demarree", method=request.method, path=request.url.path)
response = await call_next(request)
logger.info("requete_terminee", status_code=response.status_code)
return response3. Vérification de santé avec la base de données
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
return {"status": "healthy", "database": "connected"}
except Exception:
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "database": "disconnected"},
)4. Docker Compose spécifique à l'environnement
Créez un docker-compose.prod.yml pour surcharger les paramètres de production avec une gestion appropriée des secrets, des limites de ressources et du monitoring.
Résumé
Dans ce tutoriel, vous avez construit une API REST complète et prête pour la production avec :
- ✅ FastAPI — framework web Python asynchrone avec documentation OpenAPI automatique
- ✅ SQLAlchemy 2.0 — ORM moderne avec support complet du typage
- ✅ PostgreSQL — base de données relationnelle robuste
- ✅ Alembic — migrations versionnées de la base de données
- ✅ JWT — authentification sécurisée par tokens
- ✅ Docker Compose — conteneurs pour le développement et le déploiement
- ✅ Pydantic v2 — validation rapide des données
- ✅ Tests — tests asynchrones avec httpx et pytest
Prochaines étapes
- Ajouter la pagination aux endpoints de liste
- Implémenter des WebSockets pour les mises à jour en temps réel
- Mettre en place un pipeline CI/CD avec GitHub Actions
- Ajouter du cache Redis pour les requêtes fréquentes
- Déployer sur un service cloud (AWS ECS, GCP Cloud Run ou un VPS avec Docker)
La documentation générée automatiquement par FastAPI sur /docs et /redoc facilite énormément la collaboration avec les équipes frontend et les consommateurs de l'API — pas besoin de rédiger une documentation séparée.
Bon code ! 🚀
Discutez de votre projet avec nous
Nous sommes ici pour vous aider avec vos besoins en développement Web. Planifiez un appel pour discuter de votre projet et comment nous pouvons vous aider.
Trouvons les meilleures solutions pour vos besoins.
Articles connexes

Deployer une Application Next.js avec Docker et CI/CD en Production
Apprenez a containeriser votre application Next.js avec Docker, configurer un pipeline CI/CD avec GitHub Actions, et deployer en production sur un VPS. Guide complet du developpement au deploiement automatise.

Construire une Application Full-Stack avec Drizzle ORM et Next.js 15 : Base de Donnees Type-Safe du Zero a la Production
Apprenez a construire une application full-stack type-safe avec Drizzle ORM et Next.js 15. Ce tutoriel pratique couvre la conception de schemas, les migrations, les Server Actions, les operations CRUD et le deploiement avec PostgreSQL.

Construire des API REST avec Rust et Axum : guide pratique pour débutants
Apprenez à construire des API REST rapides et sûres avec Rust et le framework web Axum. Ce guide étape par étape couvre la configuration du projet, le routage, la gestion JSON, l'intégration base de données avec SQLx, la gestion des erreurs et les tests — de zéro à une API fonctionnelle.