数据库 / 已完成
FastAPI 连接数据库并实现增删改查
学习 SQLAlchemy async、数据库连接、Model、Schema 和完整 CRUD 接口。
返回学习路线这篇教程讲的是:FastAPI 怎么连接数据库,并写出真正能用的“增删改查”接口。
一句话:FastAPI 负责接请求,SQLAlchemy 负责和数据库说话,Pydantic 负责检查数据,数据库负责真正保存数据。
目录
- 为什么接口服务一定要学数据库
- 数据库连接这件事到底在干嘛
- 本文使用的技术组合
- 推荐项目结构
- 安装依赖
- 配置数据库地址
- 创建数据库连接
- 创建数据表模型
- 创建请求和响应 Schema
- 写增删改查接口
- 启动服务并测试
- 增删改查背后的固定套路
- 常见错误和排查方法
- 学完后下一步学什么
1. 为什么接口服务一定要学数据库
如果一个后端接口没有数据库,它就像一个只会临时记事的人。
你请求它:
新增一个用户:小明
它可能当时记住了,但服务一重启,数据就没了。
数据库的作用就是:把重要数据长期保存下来。
比如:
| 业务 | 数据库存什么 |
|---|---|
| 用户系统 | 用户名、邮箱、密码 hash |
| 文章系统 | 标题、内容、发布时间 |
| 订单系统 | 商品、金额、支付状态 |
| 后台系统 | 角色、权限、菜单 |
所以后端开发真正开始做业务时,一定绕不开数据库。
2. 数据库连接这件事到底在干嘛
你可以把数据库想成一个“资料室”。
FastAPI 是办事窗口,SQLAlchemy 是窗口工作人员,数据库是资料室。
一次查询大概是:
用户请求接口
↓
FastAPI 收到请求
↓
SQLAlchemy 拿一张临时通行证
↓
去数据库查资料
↓
整理成 JSON
↓
返回给前端
这里的“临时通行证”就是数据库会话,也就是 Session。
为什么不能每个接口自己乱连数据库?
因为这样会很乱:
- 有的接口忘了关闭连接。
- 有的接口出错后没有回滚。
- 数据库连接越来越多,服务会变慢甚至挂掉。
- 后面不好统一维护。
标准做法是:统一创建数据库连接,接口通过依赖注入使用它。
3. 本文使用的技术组合
这篇用一套比较标准的后端组合:
| 技术 | 作用 | 大白话 |
|---|---|---|
| FastAPI | 写接口 | 接待请求 |
| Uvicorn | 启动服务 | 发动机 |
| PostgreSQL | 数据库 | 资料室 |
| SQLAlchemy 2.x async | 操作数据库 | 用 Python 写数据库操作 |
| asyncpg | PostgreSQL 异步驱动 | SQLAlchemy 和 PostgreSQL 之间的插头 |
| Pydantic | 校验数据 | 检查前端传来的材料 |
如果你只是练习,也可以先用 SQLite。
但如果你想搭标准服务,建议直接用 PostgreSQL。
4. 推荐项目结构
先建一个最小但标准的结构:
fastapi-crud-demo/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── api/
│ │ ├── __init__.py
│ │ └── v1/
│ │ ├── __init__.py
│ │ └── users.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py
│ │ └── database.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── user.py
│ └── schemas/
│ ├── __init__.py
│ └── user.py
├── requirements.txt
└── .env.example
每层做什么:
| 目录 | 用途 |
|---|---|
api/v1/ | 放接口路由 |
core/ | 放配置、数据库连接 |
models/ | 放数据库表模型 |
schemas/ | 放请求和响应格式 |
main.py | 应用入口,注册路由 |
大白话:路由管“门牌号”,Schema 管“材料格式”,Model 管“数据库表”,core 管“基础设施”。
5. 安装依赖
创建项目:
mkdir fastapi-crud-demo
cd fastapi-crud-demo
创建虚拟环境:
python3 -m venv .venv
source .venv/bin/activate
Windows:
.venv\Scripts\activate
创建 requirements.txt:
fastapi==0.115.6
uvicorn[standard]==0.32.1
sqlalchemy==2.0.36
asyncpg==0.30.0
pydantic[email]==2.10.3
pydantic-settings==2.7.0
安装:
pip install -r requirements.txt
创建目录:
mkdir -p app/api/v1 app/core app/models app/schemas
touch app/__init__.py
touch app/api/__init__.py app/api/v1/__init__.py
touch app/core/__init__.py app/models/__init__.py app/schemas/__init__.py
6. 配置数据库地址
创建:
app/core/config.py
代码:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "FastAPI CRUD Demo"
DATABASE_URL: str = "postgresql+asyncpg://postgres:password@localhost:5432/appdb"
class Config:
env_file = ".env"
settings = Settings()
创建 .env.example:
APP_NAME=FastAPI CRUD Demo
DATABASE_URL=postgresql+asyncpg://postgres:password@localhost:5432/appdb
本地开发时复制:
cp .env.example .env
然后把 .env 改成你自己的数据库地址。
数据库地址长这样:
postgresql+asyncpg://用户名:密码@主机:端口/数据库名
比如:
postgresql+asyncpg://postgres:password@localhost:5432/appdb
拆开看:
| 部分 | 意思 |
|---|---|
postgresql+asyncpg | 使用 PostgreSQL,并通过 asyncpg 连接 |
postgres | 用户名 |
password | 密码 |
localhost | 数据库所在机器 |
5432 | PostgreSQL 默认端口 |
appdb | 数据库名 |
7. 创建数据库连接
创建:
app/core/database.py
代码:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
这段代码是数据库部分最重要的基础。
逐句解释:
| 代码 | 大白话 |
|---|---|
create_async_engine(...) | 创建数据库发动机 |
async_sessionmaker(...) | 创建“会话工厂” |
Base | 所有数据库表模型的共同父类 |
get_db() | 给接口提供数据库会话 |
commit() | 操作成功后提交 |
rollback() | 出错后撤销 |
接口里以后这样拿数据库:
db: AsyncSession = Depends(get_db)
你可以理解成:这个接口需要数据库,请 FastAPI 自动帮我递一张“数据库通行证”。
8. 创建数据表模型
8.1 公共时间字段
创建:
app/models/base.py
代码:
import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
class TimestampMixin:
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
这个类的作用是:让每张表都有创建时间和更新时间。
8.2 用户表模型
创建:
app/models/user.py
代码:
from sqlalchemy import Boolean, Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base
from app.models.base import TimestampMixin
class User(Base, TimestampMixin):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
email: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
status: Mapped[bool] = mapped_column(Boolean, default=True)
这就是一张用户表。
| 字段 | 意思 |
|---|---|
id | 主键,自增 |
username | 用户名,唯一 |
email | 邮箱,唯一 |
status | 是否启用 |
created_at | 创建时间,从 TimestampMixin 来 |
updated_at | 更新时间,从 TimestampMixin 来 |
9. 创建请求和响应 Schema
创建:
app/schemas/user.py
代码:
from datetime import datetime
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
status: bool = True
class UserUpdate(BaseModel):
username: str | None = None
email: EmailStr | None = None
status: bool | None = None
class UserResponse(UserBase):
id: int
status: bool
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class UserListResponse(BaseModel):
total: int
items: list[UserResponse]
大白话解释:
| Schema | 用途 |
|---|---|
UserCreate | 新增用户时,前端要传什么 |
UserUpdate | 修改用户时,前端可以传什么 |
UserResponse | 返回给前端什么 |
UserListResponse | 用户列表返回什么 |
重点是这一行:
model_config = {"from_attributes": True}
它的意思是:允许 Pydantic 从 SQLAlchemy 的数据库对象里读取字段,然后转成接口响应。
不写它,很多时候会出现“明明查到了数据,但响应模型转换失败”的问题。
10. 写增删改查接口
创建:
app/api/v1/users.py
完整代码:
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserListResponse, UserResponse, UserUpdate
router = APIRouter()
@router.get("", response_model=UserListResponse)
async def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
keyword: str | None = Query(None),
db: AsyncSession = Depends(get_db),
):
query = select(User)
count_query = select(func.count()).select_from(User)
if keyword:
query = query.where(User.username.ilike(f"%{keyword}%"))
count_query = count_query.where(User.username.ilike(f"%{keyword}%"))
total = (await db.execute(count_query)).scalar() or 0
result = await db.execute(
query.order_by(User.id.desc()).offset((page - 1) * page_size).limit(page_size)
)
users = list(result.scalars().all())
return UserListResponse(total=total, items=users)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
):
user = (
await db.execute(select(User).where(User.id == user_id))
).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
@router.post("", response_model=UserResponse)
async def create_user(
body: UserCreate,
db: AsyncSession = Depends(get_db),
):
user = User(**body.model_dump())
db.add(user)
try:
await db.flush()
await db.refresh(user)
except IntegrityError:
raise HTTPException(status_code=400, detail="用户名或邮箱已存在")
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
body: UserUpdate,
db: AsyncSession = Depends(get_db),
):
user = (
await db.execute(select(User).where(User.id == user_id))
).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
for key, value in body.model_dump(exclude_unset=True).items():
setattr(user, key, value)
try:
await db.flush()
await db.refresh(user)
except IntegrityError:
raise HTTPException(status_code=400, detail="用户名或邮箱已存在")
return user
@router.delete("/{user_id}")
async def delete_user(
user_id: int,
db: AsyncSession = Depends(get_db),
):
user = (
await db.execute(select(User).where(User.id == user_id))
).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
await db.delete(user)
return {"message": "删除成功"}
这里面已经有完整的 CRUD。
| 接口 | 方法 | 路径 | 作用 |
|---|---|---|---|
| 列表 | GET | /api/v1/users | 查询用户列表 |
| 详情 | GET | /api/v1/users/{user_id} | 查询单个用户 |
| 新增 | POST | /api/v1/users | 新增用户 |
| 修改 | PUT | /api/v1/users/{user_id} | 修改用户 |
| 删除 | DELETE | /api/v1/users/{user_id} | 删除用户 |
11. 创建应用入口
创建:
app/main.py
代码:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1 import users
from app.core.config import settings
app = FastAPI(title=settings.APP_NAME, version="1.0.0", docs_url="/docs")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(users.router, prefix="/api/v1/users", tags=["用户管理"])
@app.get("/health")
async def health():
return {"status": "ok"}
到这里,路由才真正注册进 FastAPI。
如果你写了 users.py,但 /docs 看不到接口,第一反应就应该检查 main.py 有没有:
app.include_router(...)
12. 创建数据库表
正式项目推荐用 Alembic 管理表结构。
但这篇先讲增删改查,所以给一个最简单的练习方式:启动时自动建表。
修改 app/main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1 import users
from app.core.config import settings
from app.core.database import Base, engine
from app.models import user
@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
app = FastAPI(
title=settings.APP_NAME,
version="1.0.0",
docs_url="/docs",
lifespan=lifespan,
)
注意:
from app.models import user
这行看起来没用,其实很重要。它的作用是让 Python 加载 User 模型,这样 Base.metadata.create_all 才知道要创建 users 表。
初学练习可以这样做。
但正式项目建议改用:
Alembic 数据库迁移
因为自动建表适合练习,不适合长期维护正式系统。
13. 启动服务并测试
13.1 启动 PostgreSQL
如果你本地没有 PostgreSQL,可以用 Docker 快速启动:
docker run -d \
--name fastapi-postgres \
-e POSTGRES_DB=appdb \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=password \
-p 5432:5432 \
postgres:16-alpine
确认 .env:
DATABASE_URL=postgresql+asyncpg://postgres:password@localhost:5432/appdb
13.2 启动 FastAPI
uvicorn app.main:app --reload --port 8000
检查健康接口:
http://localhost:8000/health
接口文档:
http://localhost:8000/docs
13.3 测试新增用户
请求:
curl -X POST http://localhost:8000/api/v1/users \
-H "Content-Type: application/json" \
-d '{"username":"xiaoming","email":"xiaoming@example.com","status":true}'
返回类似:
{
"username": "xiaoming",
"email": "xiaoming@example.com",
"id": 1,
"status": true,
"created_at": "2026-04-29T10:00:00Z",
"updated_at": "2026-04-29T10:00:00Z"
}
13.4 测试列表
curl "http://localhost:8000/api/v1/users?page=1&page_size=10"
返回:
{
"total": 1,
"items": [
{
"username": "xiaoming",
"email": "xiaoming@example.com",
"id": 1,
"status": true,
"created_at": "2026-04-29T10:00:00Z",
"updated_at": "2026-04-29T10:00:00Z"
}
]
}
13.5 测试详情
curl http://localhost:8000/api/v1/users/1
13.6 测试修改
curl -X PUT http://localhost:8000/api/v1/users/1 \
-H "Content-Type: application/json" \
-d '{"email":"new@example.com"}'
13.7 测试删除
curl -X DELETE http://localhost:8000/api/v1/users/1
14. 增删改查背后的固定套路
14.1 查询列表
固定套路:
接收 page/page_size
↓
拼查询条件
↓
查 total
↓
查当前页数据
↓
返回 total + items
核心代码:
total = (await db.execute(count_query)).scalar() or 0
result = await db.execute(query.offset(...).limit(...))
14.2 查询详情
固定套路:
根据 id 查询
↓
查不到就 404
↓
查到就返回
核心代码:
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
14.3 新增数据
固定套路:
Schema 校验请求体
↓
创建 Model 对象
↓
db.add()
↓
flush()
↓
refresh()
↓
返回新对象
为什么要 flush()?
大白话:先把新增操作送到数据库,让数据库生成 id。
为什么要 refresh()?
大白话:把数据库生成的新字段,比如 id、created_at,重新拿回来。
14.4 修改数据
固定套路:
根据 id 查询
↓
查不到就 404
↓
只更新前端传来的字段
↓
flush()
↓
refresh()
↓
返回新对象
重点:
body.model_dump(exclude_unset=True)
意思是:只拿前端真的传了的字段。
如果不加 exclude_unset=True,前端没传的字段可能会变成 None,很容易误伤数据。
14.5 删除数据
固定套路:
根据 id 查询
↓
查不到就 404
↓
db.delete()
↓
返回删除成功
15. 常见错误和排查方法
15.1 ModuleNotFoundError: No module named 'app'
原因:启动命令位置不对。
你应该在项目根目录执行:
uvicorn app.main:app --reload --port 8000
项目根目录就是能看到 app/ 文件夹的地方。
15.2 asyncpg.exceptions.InvalidCatalogNameError
意思:数据库不存在。
比如你的连接地址里写了:
appdb
但 PostgreSQL 里没有这个库。
解决:创建数据库,或者检查 Docker 启动命令里的:
-e POSTGRES_DB=appdb
15.3 /docs 里看不到 users 接口
检查 app/main.py 有没有:
from app.api.v1 import users
app.include_router(users.router, prefix="/api/v1/users", tags=["用户管理"])
15.4 新增用户时报唯一约束错误
比如用户名重复、邮箱重复。
代码里应该捕获:
except IntegrityError:
raise HTTPException(status_code=400, detail="用户名或邮箱已存在")
15.5 修改数据时字段被改成空
检查是否使用:
body.model_dump(exclude_unset=True)
不要直接:
body.model_dump()
否则没传的字段也可能被拿出来。
15.6 返回对象时报 Pydantic 错误
检查响应 Schema 有没有:
model_config = {"from_attributes": True}
15.7 服务重启后表没创建
如果你用的是 create_all,确认:
from app.models import user
有没有被导入。
如果你用 Alembic,确认有没有执行:
alembic upgrade head
16. 练习任务
学完用户 CRUD 后,可以自己做一个文章模块。
表字段:
| 字段 | 类型 | 意思 |
|---|---|---|
id | int | 主键 |
title | str | 标题 |
content | str | 内容 |
published | bool | 是否发布 |
created_at | datetime | 创建时间 |
updated_at | datetime | 更新时间 |
接口:
| 方法 | 路径 | 作用 |
|---|---|---|
GET | /api/v1/articles | 文章列表 |
GET | /api/v1/articles/{article_id} | 文章详情 |
POST | /api/v1/articles | 新增文章 |
PUT | /api/v1/articles/{article_id} | 修改文章 |
DELETE | /api/v1/articles/{article_id} | 删除文章 |
如果你能独立写出文章 CRUD,就说明数据库接口这一步真的入门了。
17. 学完后下一步学什么
建议下一篇学:
大白话讲解——FastAPI 使用 Alembic 管理数据库表结构
原因:这篇为了让你先跑通 CRUD,用了 Base.metadata.create_all 自动建表。
但正式项目不能长期靠自动建表,应该用 Alembic 记录每一次数据库结构变化。
学习路线:
- 先会连接数据库。
- 再会写 CRUD。
- 再学 Alembic 迁移。
- 再学登录鉴权。
- 再学 service/repository 分层。
- 最后学部署和监控。
18. 总结表
| 名词 | 大白话 |
|---|---|
| Engine | 数据库发动机 |
| Session | 一次数据库操作的通行证 |
| Model | 数据库表在 Python 里的样子 |
| Schema | 接口请求和响应的格式 |
| CRUD | 增删改查 |
select() | 查询 |
db.add() | 新增 |
setattr() | 修改字段 |
db.delete() | 删除 |
flush() | 把操作先送到数据库 |
refresh() | 把数据库生成的新值拿回来 |
commit() | 确认保存 |
rollback() | 出错撤销 |
最后记住这条线:
前端请求
↓
FastAPI 路由
↓
Pydantic 校验参数
↓
SQLAlchemy 操作数据库
↓
Pydantic 整理响应
↓
返回 JSON
这就是 FastAPI 连接数据库并实现增删改查的核心。