FastAPI / 闯关模式

FastAPI 搭建标准的 API 服务

从目录、配置、路由、Schema、Model、鉴权和部署几个角度,把标准 API 服务搭起来。

这篇教程讲的是:如果你要从零搭建一个“标准的 API 后端服务”,应该有哪些目录、哪些文件、哪些基础能力,以及每个接口应该按什么套路写。

一句话:标准 API 服务不是“随便写几个接口函数”,而是要把入口、配置、路由、参数校验、数据库、登录鉴权、错误处理、日志、迁移、文档、部署这些东西整理成一套固定模板。

1. 什么是标准 API 服务

你可以把 API 服务想成一个“办事大厅”。

前端、小程序、App、第三方系统都会来这个大厅办事:

我要登录
我要查用户列表
我要新增订单
我要上传文件
我要查统计数据

一个不标准的后端,就像每个窗口都自己定规矩:

这个接口返回 code
那个接口返回 error
这个接口不校验参数
那个接口直接拼 SQL
这个接口忘了鉴权
那个接口出错只返回 500

后面会越来越乱。

一个标准 API 服务应该做到:

能力大白话
统一入口所有接口从一个应用启动
清晰路由每个业务模块一个路由文件
参数校验前端传错数据,后端能明确提示
数据库封装不到处乱建数据库连接
登录鉴权该登录的登录,该管理员的管理员
统一错误错了要讲清楚为什么错
自动文档接口写完能自动生成文档
数据库迁移表结构变化有记录,能重复部署
容器部署换机器也能快速跑起来

2. 一个请求在后端怎么流转

以这个请求为例:

GET /api/v1/users?page=1&page_size=10

标准流转通常是:

浏览器 / App / 小程序
  ↓
Nginx 或网关
  ↓
FastAPI 应用入口 main.py
  ↓
业务路由 users.py
  ↓
鉴权依赖 get_current_user
  ↓
数据库会话 get_db
  ↓
数据库模型 User
  ↓
响应 Schema UserListResponse
  ↓
返回 JSON

大白话:前端敲门,网关带路,路由分窗口,鉴权看证件,数据库取资料,Schema 整理格式,最后返回结果。

标准 FastAPI API 服务请求流

3. 推荐技术选型

这里用 Python + FastAPI 举例,因为它适合快速搭建标准 API 服务。

技术作用
Python 3.12+编程语言
FastAPIWeb API 框架
Uvicorn启动 FastAPI 的服务
Pydantic请求和响应数据校验
SQLAlchemy async异步 ORM,操作数据库
Alembic数据库迁移
PostgreSQL主数据库
Redis缓存、Token 黑名单、临时数据
python-joseJWT 登录令牌
passlib + bcrypt密码加密
Docker容器化部署
Nginx统一入口、反向代理

4. 推荐目录结构

推荐模板:

api-service/
├── app/
│   ├── __init__.py
│   ├── main.py                  # 应用入口
│   ├── api/
│   │   ├── __init__.py
│   │   └── v1/
│   │       ├── __init__.py
│   │       ├── auth.py          # 登录认证接口
│   │       └── users.py         # 用户模块接口
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py            # 配置管理
│   │   ├── database.py          # 数据库连接
│   │   ├── redis.py             # Redis 连接
│   │   └── security.py          # 密码和 JWT
│   ├── middleware/
│   │   ├── __init__.py
│   │   └── auth.py              # 当前用户、管理员校验
│   ├── models/
│   │   ├── __init__.py
│   │   ├── base.py              # 公共字段
│   │   └── user.py              # 用户表
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── auth.py              # 登录请求/响应格式
│   │   └── user.py              # 用户请求/响应格式
│   └── services/
│       ├── __init__.py
│       └── user_service.py      # 复杂业务逻辑,可选
├── alembic/
│   ├── env.py
│   └── versions/
├── alembic.ini
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── README.md

每层职责:

目录放什么不该放什么
api/v1/接口路由很复杂的业务算法
schemas/请求、响应格式数据库查询
models/数据库表结构HTTP 请求逻辑
core/配置、数据库、安全工具具体业务
middleware/登录、权限、请求前置处理页面业务
services/复杂业务逻辑路由装饰器
alembic/数据库迁移业务接口

5. 从零创建项目

5.1 创建目录

mkdir api-service
cd api-service

创建基础目录:

mkdir -p app/api/v1 app/core app/middleware app/models app/schemas app/services alembic/versions
touch app/__init__.py
touch app/api/__init__.py app/api/v1/__init__.py
touch app/core/__init__.py app/middleware/__init__.py
touch app/models/__init__.py app/schemas/__init__.py app/services/__init__.py

5.2 创建虚拟环境

python3 -m venv .venv
source .venv/bin/activate

Windows 可以用:

.venv\Scripts\activate

5.3 安装依赖

创建 requirements.txt

fastapi==0.115.6
uvicorn[standard]==0.32.1
sqlalchemy==2.0.36
alembic==1.14.0
asyncpg==0.30.0
pydantic[email]==2.10.3
pydantic-settings==2.7.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
bcrypt==4.2.1
python-multipart==0.0.20
redis==5.2.1

安装:

pip install -r requirements.txt

6. 配置管理怎么做

配置不要散落在代码里。

标准做法:统一放在 app/core/config.py

from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    APP_NAME: str = "API Service"
    APP_ENV: str = "development"

    DATABASE_URL: str = "postgresql+asyncpg://postgres:password@localhost:5432/appdb"
    REDIS_URL: str = "redis://localhost:6379"

    JWT_SECRET: str = "change-me-to-a-long-random-secret"
    JWT_EXPIRE_DAYS: int = 7

    class Config:
        env_file = ".env"


settings = Settings()

创建 .env.example

APP_NAME=API Service
APP_ENV=development
DATABASE_URL=postgresql+asyncpg://postgres:password@localhost:5432/appdb
REDIS_URL=redis://localhost:6379
JWT_SECRET=change-me-to-a-long-random-secret
JWT_EXPIRE_DAYS=7

真正本地运行时复制:

cp .env.example .env

重点规则:

配置要不要提交
.env.example可以提交,放示例值
.env不要提交,放真实密码
生产 JWT_SECRET不能用默认值
数据库密码不能写死在代码里

7. 数据库连接怎么做

文件:

app/core/database.py

代码:

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings


engine = create_async_engine(settings.DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)


class Base(DeclarativeBase):
    pass


async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

大白话:

  1. engine 是数据库发动机。
  2. AsyncSessionLocal 是会话工厂。
  3. get_db() 是每个接口借数据库连接的地方。
  4. 正常结束就提交,出错就回滚。

以后接口里这样用:

db: AsyncSession = Depends(get_db)

8. 数据表模型怎么写

8.1 公共字段

文件:

app/models/base.py

代码:

import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.orm import Mapped, mapped_column


class TimestampMixin:
    created_at: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
    )
    updated_at: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
    )

以后普通业务表都继承它,这样自动有创建时间和更新时间。

8.2 用户表

文件:

app/models/user.py

代码:

from sqlalchemy import Boolean, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base
from app.models.base import TimestampMixin


class User(Base, TimestampMixin):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    email: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    password_hash: Mapped[str] = mapped_column(String(200), nullable=False)
    is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
    status: Mapped[bool] = mapped_column(Boolean, default=True)

注意:

字段意思
username登录名
email邮箱
password_hash加密后的密码,不存明文
is_admin是否管理员
status账号是否启用

9. 请求和响应 Schema 怎么写

Schema 是接口的“材料清单”。

文件:

app/schemas/user.py

代码:

from datetime import datetime
from pydantic import BaseModel, EmailStr


class UserBase(BaseModel):
    username: str
    email: EmailStr


class UserCreate(UserBase):
    password: str
    is_admin: bool = False


class UserUpdate(BaseModel):
    username: str | None = None
    email: EmailStr | None = None
    password: str | None = None
    is_admin: bool | None = None
    status: bool | None = None


class UserResponse(UserBase):
    id: int
    is_admin: bool
    status: bool
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class UserListResponse(BaseModel):
    total: int
    items: list[UserResponse]

几个关键点:

写法意思
UserCreate新增用户时前端要传什么
UserUpdate修改用户时前端可以传什么
UserResponse返回给前端什么
model_config = {"from_attributes": True}允许从 SQLAlchemy 对象转成响应

登录 Schema:

文件:

app/schemas/auth.py
from pydantic import BaseModel


class LoginRequest(BaseModel):
    username: str
    password: str


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"

10. 路由接口怎么写

10.1 应用入口

文件:

app/main.py

代码:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1 import auth, users
from app.core.config import settings


app = FastAPI(title=settings.APP_NAME, version="1.0.0", docs_url="/docs")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(auth.router, prefix="/api/v1/auth", tags=["认证"])
app.include_router(users.router, prefix="/api/v1/users", tags=["用户管理"])


@app.get("/health")
async def health():
    return {"status": "ok"}

10.2 用户接口

文件:

app/api/v1/users.py

代码:

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import hash_password
from app.middleware.auth import get_current_user, require_admin
from app.models.user import User
from app.schemas.user import UserCreate, UserListResponse, UserResponse, UserUpdate


router = APIRouter()


@router.get("", response_model=UserListResponse)
async def list_users(
    page: int = Query(1, ge=1),
    page_size: int = Query(10, ge=1, le=100),
    keyword: str | None = Query(None),
    db: AsyncSession = Depends(get_db),
    _current_user: User = Depends(get_current_user),
):
    query = select(User)
    count_query = select(func.count()).select_from(User)

    if keyword:
        query = query.where(User.username.ilike(f"%{keyword}%"))
        count_query = count_query.where(User.username.ilike(f"%{keyword}%"))

    total = (await db.execute(count_query)).scalar() or 0
    result = await db.execute(
        query.order_by(User.id.desc()).offset((page - 1) * page_size).limit(page_size)
    )

    return UserListResponse(total=total, items=list(result.scalars().all()))


@router.post("", response_model=UserResponse)
async def create_user(
    body: UserCreate,
    db: AsyncSession = Depends(get_db),
    _current_user: User = Depends(require_admin),
):
    existing = (
        await db.execute(select(User).where(User.username == body.username))
    ).scalar_one_or_none()
    if existing:
        raise HTTPException(status_code=400, detail="用户名已存在")

    user = User(
        username=body.username,
        email=body.email,
        password_hash=hash_password(body.password),
        is_admin=body.is_admin,
    )
    db.add(user)
    await db.flush()
    await db.refresh(user)
    return user


@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    body: UserUpdate,
    db: AsyncSession = Depends(get_db),
    _current_user: User = Depends(require_admin),
):
    user = (
        await db.execute(select(User).where(User.id == user_id))
    ).scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")

    data = body.model_dump(exclude_unset=True)
    password = data.pop("password", None)
    if password:
        data["password_hash"] = hash_password(password)

    for key, value in data.items():
        setattr(user, key, value)

    await db.flush()
    await db.refresh(user)
    return user


@router.delete("/{user_id}")
async def delete_user(
    user_id: int,
    db: AsyncSession = Depends(get_db),
    _current_user: User = Depends(require_admin),
):
    user = (
        await db.execute(select(User).where(User.id == user_id))
    ).scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")

    await db.delete(user)
    return {"message": "删除成功"}

这里要特别注意:

password = data.pop("password", None)

意思是:password 是接口入参字段,不是数据库表字段。更新时要先把它拿出来,变成 password_hash 后再保存,不能直接当成普通字段写回用户对象。

这就是一个标准 CRUD 模块。

CRUD 大白话:

操作HTTP 方法用途
CreatePOST新增
ReadGET查询
UpdatePUT / PATCH修改
DeleteDELETE删除

11. 登录和鉴权怎么做

11.1 密码和 JWT 工具

文件:

app/core/security.py

代码:

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(data: dict) -> str:
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + timedelta(days=settings.JWT_EXPIRE_DAYS)
    return jwt.encode(payload, settings.JWT_SECRET, algorithm="HS256")


def decode_access_token(token: str) -> dict:
    try:
        return jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
    except JWTError:
        return {}

11.2 当前用户依赖

文件:

app/middleware/auth.py

代码:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models.user import User


bearer_scheme = HTTPBearer()


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_access_token(token)
    user_id = payload.get("sub")

    if not user_id:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的 Token",
        )

    user = (
        await db.execute(select(User).where(User.id == int(user_id)))
    ).scalar_one_or_none()

    if not user or not user.status:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户不存在或已禁用",
        )

    return user


async def require_admin(current_user: User = Depends(get_current_user)) -> User:
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="需要管理员权限",
        )
    return current_user

11.3 登录接口

文件:

app/api/v1/auth.py

代码:

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import create_access_token, verify_password
from app.middleware.auth import get_current_user
from app.models.user import User
from app.schemas.auth import LoginRequest, TokenResponse
from app.schemas.user import UserResponse


router = APIRouter()


@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest, db: AsyncSession = Depends(get_db)):
    user = (
        await db.execute(select(User).where(User.username == body.username))
    ).scalar_one_or_none()

    if not user or not verify_password(body.password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
        )

    if not user.status:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="账号已禁用",
        )

    token = create_access_token({"sub": str(user.id)})
    return TokenResponse(access_token=token)


@router.get("/me", response_model=UserResponse)
async def me(current_user: User = Depends(get_current_user)):
    return current_user

前端调用登录接口后,把 access_token 存起来。后续请求带:

Authorization: Bearer <access_token>

12. 数据库迁移怎么做

12.1 初始化 Alembic

第一次创建项目时:

alembic init alembic

它会生成:

alembic/
alembic.ini

12.2 配置 env.py

打开:

alembic/env.py

核心要点是:

  1. 使用 settings.DATABASE_URL
  2. 使用 Base.metadata
  3. 导入所有模型。

示意代码:

import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from app.core.config import settings
from app.core.database import Base
from app.models import user  # noqa


config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata


def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online():
    connectable = create_async_engine(settings.DATABASE_URL, poolclass=pool.NullPool)
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


asyncio.run(run_migrations_online())

12.3 生成迁移

每次新增表或字段:

alembic revision --autogenerate -m "add users table"

执行迁移:

alembic upgrade head

回滚上一步:

alembic downgrade -1

重要提醒:生成迁移后一定要打开文件看一眼。不要完全相信自动生成。

13. 统一错误、日志和返回格式

13.1 错误处理

业务错误用:

raise HTTPException(status_code=404, detail="用户不存在")

不要这样:

return {"error": "用户不存在"}

原因:

写法问题
return {"error": ...}HTTP 状态码可能还是 200,前端不好判断
raise HTTPException(...)状态码和错误原因都清楚

13.2 日志

简单项目可以先用 Python 自带 logging:

import logging

logger = logging.getLogger(__name__)

logger.info("user created")
logger.warning("token invalid")
logger.exception("database error")

日志里不要打印:

不要打印原因
明文密码泄露风险
完整 Token泄露风险
身份证、手机号等敏感信息合规风险

13.3 返回格式要不要统一包一层

有两种常见风格。

第一种:直接返回数据。

{
  "id": 1,
  "username": "admin"
}

第二种:统一包一层。

{
  "code": 0,
  "message": "ok",
  "data": {
    "id": 1,
    "username": "admin"
  }
}

两种都可以,关键是一个项目里要统一。

如果是前后端都自己控制的后台系统,统一包一层会方便前端拦截。
如果是更 REST 风格的公开 API,直接返回数据也很常见。

14. Docker 部署模板

14.1 Dockerfile

文件:

Dockerfile

内容:

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["sh", "-c", "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"]

大白话:镜像启动时,先升级数据库,再启动服务。

14.2 docker-compose.yml

文件:

docker-compose.yml

内容:

services:
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: appdb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${DB_PASS:-password}
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 10

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASS:-password}

  api:
    build: .
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:${DB_PASS:-password}@db:5432/appdb
      REDIS_URL: redis://:${REDIS_PASS:-password}@redis:6379
      JWT_SECRET: ${JWT_SECRET:-change-me-to-a-long-random-secret}
      APP_ENV: production
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    ports:
      - "8000:8000"

volumes:
  pgdata:

启动:

docker compose up -d --build

看日志:

docker compose logs -f api

停止:

docker compose down

15. 新增一个业务模块的完整步骤

假设新增“文章 articles”模块。

第一步:建模型

文件:

app/models/article.py
from sqlalchemy import Boolean, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base
from app.models.base import TimestampMixin


class Article(Base, TimestampMixin):
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(120), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)

第二步:让 Alembic 认识模型

alembic/env.py 里加:

from app.models import article  # noqa

第三步:写 Schema

文件:

app/schemas/article.py
from datetime import datetime
from pydantic import BaseModel


class ArticleCreate(BaseModel):
    title: str
    content: str
    published: bool = False


class ArticleUpdate(BaseModel):
    title: str | None = None
    content: str | None = None
    published: bool | None = None


class ArticleResponse(BaseModel):
    id: int
    title: str
    content: str
    published: bool
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class ArticleListResponse(BaseModel):
    total: int
    items: list[ArticleResponse]

第四步:写路由

文件:

app/api/v1/articles.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.middleware.auth import get_current_user, require_admin
from app.models.article import Article
from app.schemas.article import ArticleCreate, ArticleListResponse, ArticleResponse, ArticleUpdate


router = APIRouter()


@router.get("", response_model=ArticleListResponse)
async def list_articles(
    page: int = Query(1, ge=1),
    page_size: int = Query(10, ge=1, le=100),
    db: AsyncSession = Depends(get_db),
    _=Depends(get_current_user),
):
    total = (await db.execute(select(func.count()).select_from(Article))).scalar() or 0
    result = await db.execute(
        select(Article).order_by(Article.id.desc()).offset((page - 1) * page_size).limit(page_size)
    )
    return ArticleListResponse(total=total, items=list(result.scalars().all()))


@router.post("", response_model=ArticleResponse)
async def create_article(
    body: ArticleCreate,
    db: AsyncSession = Depends(get_db),
    _=Depends(require_admin),
):
    article = Article(**body.model_dump())
    db.add(article)
    await db.flush()
    await db.refresh(article)
    return article


@router.put("/{article_id}", response_model=ArticleResponse)
async def update_article(
    article_id: int,
    body: ArticleUpdate,
    db: AsyncSession = Depends(get_db),
    _=Depends(require_admin),
):
    article = (
        await db.execute(select(Article).where(Article.id == article_id))
    ).scalar_one_or_none()
    if not article:
        raise HTTPException(status_code=404, detail="文章不存在")

    for key, value in body.model_dump(exclude_unset=True).items():
        setattr(article, key, value)

    await db.flush()
    await db.refresh(article)
    return article


@router.delete("/{article_id}")
async def delete_article(
    article_id: int,
    db: AsyncSession = Depends(get_db),
    _=Depends(require_admin),
):
    article = (
        await db.execute(select(Article).where(Article.id == article_id))
    ).scalar_one_or_none()
    if not article:
        raise HTTPException(status_code=404, detail="文章不存在")

    await db.delete(article)
    return {"message": "删除成功"}

第五步:注册路由

app/main.py

from app.api.v1 import articles

app.include_router(articles.router, prefix="/api/v1/articles", tags=["文章管理"])

第六步:生成迁移

alembic revision --autogenerate -m "add articles table"

检查迁移文件后执行:

alembic upgrade head

第七步:启动验证

uvicorn app.main:app --reload --port 8000

打开:

http://localhost:8000/docs

检查是否出现“文章管理”。

16. 最后检查清单

每次搭建或新增接口,都按这个清单走:

[ ] 1. 项目有清晰目录:api / schemas / models / core / services
[ ] 2. 配置统一从 .env 读取
[ ] 3. 数据库连接统一通过 get_db 注入
[ ] 4. 密码只存 hash,不存明文
[ ] 5. 登录成功返回 JWT
[ ] 6. 需要登录的接口使用 get_current_user
[ ] 7. 管理员接口使用 require_admin
[ ] 8. 请求体使用 Pydantic Schema 校验
[ ] 9. 响应使用 response_model 声明
[ ] 10. 列表接口有分页
[ ] 11. 业务错误使用 HTTPException
[ ] 12. 数据表变化通过 Alembic 迁移
[ ] 13. 新模型已导入 alembic/env.py
[ ] 14. /health 能正常返回
[ ] 15. /docs 能看到接口文档
[ ] 16. Docker 能正常构建和启动
[ ] 17. 生产环境 JWT_SECRET、数据库密码不是默认值

17. 最后总结

一个标准 API 服务模板,可以记成这条线:

main.py 创建应用
  ↓
api/v1 注册业务路由
  ↓
schemas 校验请求和响应
  ↓
middleware 做登录鉴权
  ↓
core/database 提供数据库会话
  ↓
models 映射数据库表
  ↓
alembic 管理数据库变化
  ↓
Docker 负责部署

以后你新增接口时,不要只问“接口函数怎么写”,要一起想清楚:

路径怎么命名
参数怎么校验
返回什么格式
谁能访问
数据存在哪张表
表结构怎么迁移
前端怎么联调
线上怎么部署

这就是“标准 API 服务模板”的核心。