Appearance
第 5 章:数据库基础
无论是 RPA 自动化中的数据记录,还是 Web 后端的数据持久化,数据库都是不可或缺的基础设施。作为前端开发者,你可能熟悉 IndexedDB 或 localStorage,Python 生态中的数据库工具同样丰富且强大。本章将从零开始,带你掌握 SQLite 嵌入式数据库、SQLAlchemy ORM 以及 pandas 数据交互。
5.1 为什么需要数据库
在 RPA 和自动化场景中,数据通常需要:
- 持久化存储:脚本重启后数据不丢失
- 结构化查询:按条件快速检索、过滤、聚合
- 关系管理:多表关联,避免数据冗余
- 并发安全:多任务同时读写时保证数据一致性
前端类比
- SQLite ≈ 浏览器中的 IndexedDB(本地嵌入式数据库)
- SQLAlchemy ORM ≈ Prisma / TypeORM(用对象操作替代手写 SQL)
- pandas + SQL ≈ 用 JavaScript 数组方法处理从 API 获取的数据后存入数据库
5.2 SQLite —— 零配置的本地数据库
SQLite 是 Python 标准库内置的数据库引擎,无需单独安装服务器,一个文件就是一个数据库。它是 RPA 自动化中存储本地数据的首选方案。
基础 CRUD 操作
python
"""SQLite 基础操作 - 轻量级本地数据库.
无需安装第三方库,Python 标准库直接可用.
"""
import sqlite3
from pathlib import Path
# 连接数据库(如果不存在则自动创建)
# 传入 ':memory:' 可创建内存数据库(重启后数据丢失)
db_path = Path("data/my_rpa.db")
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# ========== 创建表 ==========
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
status TEXT DEFAULT 'pending',
priority INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
)
""")
# ========== 插入数据(Create)==========
tasks = [
("自动整理下载文件夹", "pending", 2),
("发送日报邮件", "pending", 3),
("备份数据库", "completed", 1),
]
cursor.executemany(
"INSERT INTO tasks (name, status, priority) VALUES (?, ?, ?)",
tasks
)
conn.commit() # 必须 commit 才能持久化
# ========== 查询数据(Read)==========
# 查询所有任务
cursor.execute("SELECT * FROM tasks")
rows = cursor.fetchall()
for row in rows:
print(row)
# 条件查询:只查未完成的优先级 >= 2 的任务
cursor.execute(
"SELECT id, name, priority FROM tasks WHERE status = ? AND priority >= ? ORDER BY priority DESC",
("pending", 2)
)
pending_tasks = cursor.fetchall()
print(f"\n高优先级待办任务: {pending_tasks}")
# ========== 更新数据(Update)==========
cursor.execute(
"UPDATE tasks SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE name = ?",
("completed", "自动整理下载文件夹")
)
conn.commit()
# ========== 删除数据(Delete)==========
cursor.execute("DELETE FROM tasks WHERE status = ?", ("completed",))
conn.commit()
print(f"删除了 {cursor.rowcount} 条已完成任务")
# 关闭连接
cursor.close()
conn.close()Row Factory:让查询结果更像字典
默认的 fetchall() 返回元组列表,通过 row_factory 可以转为字典,访问更直观:
python
import sqlite3
conn = sqlite3.connect("data/my_rpa.db")
conn.row_factory = sqlite3.Row # 让每一行支持字典式访问
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE status = ?", ("pending",))
for row in cursor.fetchall():
# 现在可以像字典一样访问
print(f"任务: {row['name']}, 优先级: {row['priority']}")
conn.close()上下文管理器(自动提交和关闭)
python
import sqlite3
# 使用上下文管理器,自动处理 commit/rollback/close
with sqlite3.connect("data/my_rpa.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO tasks (name, status) VALUES (?, ?)", ("新任务", "pending"))
# 退出 with 块时自动 commitSQLite 适用场景
- RPA 自动化中的本地数据记录(日志、任务状态、配置)
- 小到中型 Web 应用的数据层
- 原型开发阶段快速验证数据模型
- 单用户桌面应用的数据存储
5.3 SQLAlchemy —— Python 的 Prisma
SQLAlchemy 是 Python 最成熟的 ORM(对象关系映射)库,让你用操作对象的方式替代手写 SQL。对于熟悉 Prisma 或 TypeORM 的前端开发者,概念几乎一致。
安装
bash
pip install sqlalchemy声明式模型(Declarative Base)
python
"""SQLAlchemy ORM 示例 - 用对象操作数据库.
对比 TypeORM/Prisma:
- declarative_base() ≈ Prisma Schema 的模型定义
- Session ≈ Prisma Client 的查询会话
- engine ≈ 数据库连接池
"""
from sqlalchemy import create_engine, Column, Integer, String, DateTime, func
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime
# 1. 创建引擎(连接数据库)
# SQLite 示例:echo=True 会打印所有 SQL 语句(调试用)
engine = create_engine("sqlite:///data/rpa_orm.db", echo=False)
# 2. 声明基类(类似 Prisma 的模型基类)
Base = declarative_base()
# 3. 定义模型(类似 Prisma Schema)
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
status = Column(String, default="pending")
priority = Column(Integer, default=1)
created_at = Column(DateTime, default=datetime.now)
completed_at = Column(DateTime, nullable=True)
def __repr__(self):
return f"<Task(id={self.id}, name='{self.name}', status='{self.status}')>"
class EmailLog(Base):
__tablename__ = "email_logs"
id = Column(Integer, primary_key=True)
recipient = Column(String, nullable=False)
subject = Column(String)
sent_at = Column(DateTime, default=datetime.now)
status = Column(String, default="sent") # sent, failed, pending
# 4. 创建所有表(类似 prisma migrate dev)
Base.metadata.create_all(engine)
# 5. 创建会话工厂(类似 Prisma Client 实例)
SessionLocal = sessionmaker(bind=engine)CRUD 操作
python
# 创建会话
session = SessionLocal()
# ========== 创建(Create)==========
new_task = Task(name="爬取竞品价格", priority=2)
session.add(new_task)
session.commit() # 提交事务
session.refresh(new_task) # 刷新以获取自动生成的 ID
print(f"创建任务: {new_task.id}")
# 批量创建
tasks_to_add = [
Task(name="生成周报", priority=1),
Task(name="数据清洗", priority=3),
]
session.add_all(tasks_to_add)
session.commit()
# ========== 查询(Read)==========
# 查询所有
all_tasks = session.query(Task).all()
# 条件查询(类似 Prisma 的 where)
pending_tasks = session.query(Task).filter(Task.status == "pending").all()
# 多条件 + 排序 + 限制(类似 Prisma 的 findMany)
high_priority = (
session.query(Task)
.filter(Task.status == "pending", Task.priority >= 2)
.order_by(Task.priority.desc())
.limit(5)
.all()
)
# 查询单条(类似 Prisma 的 findUnique)
task = session.query(Task).filter_by(id=1).first()
if task:
print(f"找到任务: {task.name}")
# 计数(类似 Prisma 的 count)
count = session.query(Task).filter(Task.status == "completed").count()
# ========== 更新(Update)==========
task_to_update = session.query(Task).filter_by(name="爬取竞品价格").first()
if task_to_update:
task_to_update.status = "completed"
task_to_update.completed_at = datetime.now()
session.commit()
# ========== 删除(Delete)==========
task_to_delete = session.query(Task).filter_by(status="completed").first()
if task_to_delete:
session.delete(task_to_delete)
session.commit()
session.close()关系定义(一对多 / 多对一)
python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
description = Column(String)
# 定义一对多关系:一个项目有多个任务
# 参数说明:
# "Task" - 关联的模型类名
# back_populates="project" - Task 模型中对应的反向关系属性名
tasks = relationship("Task", back_populates="project")
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=True)
# 多对一关系:每个任务属于一个项目
project = relationship("Project", back_populates="tasks")
# 使用关系
session = SessionLocal()
proj = Project(name="RPA 办公自动化", description="Q4 自动化项目")
t1 = Task(name="自动发邮件", project=proj)
t2 = Task(name="生成报表", project=proj)
session.add_all([proj, t1, t2])
session.commit()
# 通过关系属性访问
print(f"项目 '{proj.name}' 的任务数: {len(proj.tasks)}")
print(f"任务 '{t1.name}' 所属项目: {t1.project.name}")
session.close()前端类比
| SQLAlchemy | Prisma / TypeORM |
|---|---|
declarative_base() + 模型类 | schema.prisma 中的 model 定义 |
session.add(obj) | prisma.model.create() |
session.query(Model).filter(...) | prisma.model.findMany({ where: {...} }) |
relationship(...) | 模型间的 @relation 字段 |
session.commit() | 自动提交或显式 $transaction |
5.4 pandas + SQL —— 数据清洗与入库
RPA 场景中常需要:读取 Excel → 清洗数据 → 存入数据库。pandas 提供了无缝的 SQL 交互能力。
安装
bash
pip install pandas sqlalchemyDataFrame 与数据库互转
python
"""pandas + SQL 交互 - RPA 数据处理的常见链路.
典型工作流:
Excel/CSV → pandas DataFrame → 清洗 → 存入 SQLite → 查询分析
"""
import pandas as pd
from sqlalchemy import create_engine
# 创建连接引擎
engine = create_engine("sqlite:///data/sales.db")
# ========== DataFrame 写入数据库 ==========
# 创建示例数据(实际中可能来自 Excel、API 或爬虫)
df = pd.DataFrame({
"product": ["iPhone 15", "MacBook Pro", "AirPods Pro", "iPhone 15"],
"region": ["华东", "华北", "华东", "华南"],
"quantity": [100, 50, 200, 80],
"price": [5999.0, 14999.0, 1899.0, 5999.0],
"date": pd.to_datetime(["2024-01-15", "2024-01-16", "2024-01-17", "2024-01-18"]),
})
# 写入数据库(自动创建表,if_exists='replace' 覆盖 / 'append' 追加)
df.to_sql("sales", engine, index=False, if_exists="replace")
print("数据已写入 sales 表")
# ========== 从数据库读取到 DataFrame ==========
# 读取整张表
df_from_db = pd.read_sql("SELECT * FROM sales", engine)
print(df_from_db)
# 读取时直接执行 SQL 查询(聚合、过滤在数据库层完成)
summary = pd.read_sql("""
SELECT
product,
SUM(quantity) as total_qty,
ROUND(AVG(price), 2) as avg_price,
SUM(quantity * price) as revenue
FROM sales
GROUP BY product
ORDER BY revenue DESC
""", engine)
print("\n产品销售汇总:")
print(summary)
# ========== 读取 Excel → 清洗 → 入库 ==========
# 假设从 Excel 读取了原始数据
# raw_df = pd.read_excel("raw_sales.xlsx")
# 清洗示例
cleaned_df = df.copy()
cleaned_df["revenue"] = cleaned_df["quantity"] * cleaned_df["price"]
cleaned_df = cleaned_df[cleaned_df["quantity"] > 0] # 过滤异常值
cleaned_df["region"] = cleaned_df["region"].str.strip() # 去除空格
# 清洗后入库
cleaned_df.to_sql("sales_cleaned", engine, index=False, if_exists="replace")
print(f"\n清洗后入库 {len(cleaned_df)} 条记录")数据库迁移简单方案
对于 RPA 脚本和小型项目,可以使用 alembic 进行数据库迁移管理:
bash
# 安装 alembic
pip install alembic
# 初始化迁移环境(类似 prisma migrate dev 的初始化)
alembic init alembic
# 修改 alembic.ini 中的数据库连接字符串
# 修改 alembic/env.py 导入你的 Base
# 创建迁移脚本
alembic revision --autogenerate -m "create tasks table"
# 执行迁移
alembic upgrade head何时使用 ORM vs 原生 SQL?
- 原生 SQLite:简单脚本、一次性任务、学习阶段
- SQLAlchemy ORM:中大型项目、多表关联、团队协作、需要类型安全
- pandas + SQL:数据清洗、分析、报表生成、与 Excel 互操作
5.5 RPA 场景实战:任务调度日志系统
结合本章所学,构建一个 RPA 任务调度日志系统——记录每次自动化任务的执行时间、状态和结果。
python
"""RPA 任务调度日志系统 - 综合运用 SQLite + SQLAlchemy.
功能:
- 注册自动化任务
- 记录每次执行日志(开始时间、结束时间、状态、错误信息)
- 查询任务成功率统计
"""
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Float, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime
from contextlib import contextmanager
engine = create_engine("sqlite:///data/rpa_scheduler.db", echo=False)
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)
class RpaTask(Base):
"""自动化任务定义"""
__tablename__ = "rpa_tasks"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False, unique=True)
description = Column(Text)
schedule = Column(String) # cron 表达式或描述
enabled = Column(Integer, default=1) # 1=启用, 0=禁用
logs = relationship("TaskLog", back_populates="task", cascade="all, delete-orphan")
class TaskLog(Base):
"""任务执行日志"""
__tablename__ = "task_logs"
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey("rpa_tasks.id"), nullable=False)
started_at = Column(DateTime, default=datetime.now)
finished_at = Column(DateTime, nullable=True)
status = Column(String, default="running") # running, success, failed
error_message = Column(Text, nullable=True)
duration_ms = Column(Integer, nullable=True)
task = relationship("RpaTask", back_populates="logs")
# 创建表
Base.metadata.create_all(engine)
@contextmanager
def get_session():
"""上下文管理器,自动管理 Session 生命周期."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def register_task(name: str, description: str, schedule: str = "") -> int:
"""注册一个新任务."""
with get_session() as session:
task = RpaTask(name=name, description=description, schedule=schedule)
session.add(task)
session.flush() # 获取生成的 ID
return task.id
def start_log(task_id: int) -> int:
"""记录任务开始,返回日志 ID."""
with get_session() as session:
log = TaskLog(task_id=task_id, status="running")
session.add(log)
session.flush()
return log.id
def finish_log(log_id: int, status: str = "success", error: str = ""):
"""记录任务结束."""
with get_session() as session:
log = session.query(TaskLog).filter_by(id=log_id).first()
if log:
log.finished_at = datetime.now()
log.status = status
log.error_message = error if error else None
if log.started_at:
delta = log.finished_at - log.started_at
log.duration_ms = int(delta.total_seconds() * 1000)
def get_task_stats(task_id: int | None = None) -> dict:
"""查询任务执行统计."""
with get_session() as session:
query = session.query(TaskLog)
if task_id:
query = query.filter_by(task_id=task_id)
logs = query.all()
total = len(logs)
success = sum(1 for l in logs if l.status == "success")
failed = sum(1 for l in logs if l.status == "failed")
avg_duration = sum(l.duration_ms for l in logs if l.duration_ms) / max(success, 1)
return {
"total_runs": total,
"success": success,
"failed": failed,
"success_rate": f"{success / max(total, 1) * 100:.1f}%",
"avg_duration_ms": round(avg_duration, 1),
}
# ========== 使用示例 ==========
if __name__ == "__main__":
# 注册任务
task_id = register_task(
name="daily_email_report",
description="每日自动发送邮件报表",
schedule="0 9 * * *"
)
print(f"任务已注册,ID: {task_id}")
# 模拟执行
log_id = start_log(task_id)
print(f"任务开始,日志 ID: {log_id}")
# ... 这里执行实际的 RPA 逻辑 ...
finish_log(log_id, status="success")
print("任务完成")
# 查看统计
stats = get_task_stats(task_id)
print(f"\n任务统计: {stats}")本章小结
在本章中,你学习了:
- SQLite:零配置的本地数据库,适合 RPA 数据持久化
- SQLAlchemy ORM:用 Python 对象操作数据库,类似 Prisma/TypeORM
- pandas + SQL:数据清洗、分析与数据库的高效互操作
- 实战:构建了一个 RPA 任务调度日志系统
这些技能是后续网络请求数据存储、RPA 报表生成和 Web 后端开发的必备基础。