Skip to content

第 5 章:数据库基础

无论是 RPA 自动化中的数据记录,还是 Web 后端的数据持久化,数据库都是不可或缺的基础设施。作为前端开发者,你可能熟悉 IndexedDB 或 localStorage,Python 生态中的数据库工具同样丰富且强大。本章将从零开始,带你掌握 SQLite 嵌入式数据库、SQLAlchemy ORM 以及 pandas 数据交互。

5.1 为什么需要数据库

在 RPA 和自动化场景中,数据通常需要:

  • 持久化存储:脚本重启后数据不丢失
  • 结构化查询:按条件快速检索、过滤、聚合
  • 关系管理:多表关联,避免数据冗余
  • 并发安全:多任务同时读写时保证数据一致性

前端类比

  • SQLite ≈ 浏览器中的 IndexedDB(本地嵌入式数据库)
  • SQLAlchemy ORM ≈ Prisma / TypeORM(用对象操作替代手写 SQL)
  • pandas + SQL ≈ 用 JavaScript 数组方法处理从 API 获取的数据后存入数据库

5.2 SQLite —— 零配置的本地数据库

SQLite 是 Python 标准库内置的数据库引擎,无需单独安装服务器,一个文件就是一个数据库。它是 RPA 自动化中存储本地数据的首选方案。

基础 CRUD 操作

python
"""SQLite 基础操作 - 轻量级本地数据库.

无需安装第三方库,Python 标准库直接可用.
"""

import sqlite3
from pathlib import Path

# 连接数据库(如果不存在则自动创建)
# 传入 ':memory:' 可创建内存数据库(重启后数据丢失)
db_path = Path("data/my_rpa.db")
db_path.parent.mkdir(parents=True, exist_ok=True)

conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# ========== 创建表 ==========
cursor.execute("""
    CREATE TABLE IF NOT EXISTS tasks (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        status TEXT DEFAULT 'pending',
        priority INTEGER DEFAULT 1,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        completed_at TIMESTAMP
    )
""")

# ========== 插入数据(Create)==========
tasks = [
    ("自动整理下载文件夹", "pending", 2),
    ("发送日报邮件", "pending", 3),
    ("备份数据库", "completed", 1),
]

cursor.executemany(
    "INSERT INTO tasks (name, status, priority) VALUES (?, ?, ?)",
    tasks
)
conn.commit()  # 必须 commit 才能持久化

# ========== 查询数据(Read)==========
# 查询所有任务
cursor.execute("SELECT * FROM tasks")
rows = cursor.fetchall()
for row in rows:
    print(row)

# 条件查询:只查未完成的优先级 >= 2 的任务
cursor.execute(
    "SELECT id, name, priority FROM tasks WHERE status = ? AND priority >= ? ORDER BY priority DESC",
    ("pending", 2)
)
pending_tasks = cursor.fetchall()
print(f"\n高优先级待办任务: {pending_tasks}")

# ========== 更新数据(Update)==========
cursor.execute(
    "UPDATE tasks SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE name = ?",
    ("completed", "自动整理下载文件夹")
)
conn.commit()

# ========== 删除数据(Delete)==========
cursor.execute("DELETE FROM tasks WHERE status = ?", ("completed",))
conn.commit()
print(f"删除了 {cursor.rowcount} 条已完成任务")

# 关闭连接
cursor.close()
conn.close()

Row Factory:让查询结果更像字典

默认的 fetchall() 返回元组列表,通过 row_factory 可以转为字典,访问更直观:

python
import sqlite3

conn = sqlite3.connect("data/my_rpa.db")
conn.row_factory = sqlite3.Row  # 让每一行支持字典式访问

cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE status = ?", ("pending",))

for row in cursor.fetchall():
    # 现在可以像字典一样访问
    print(f"任务: {row['name']}, 优先级: {row['priority']}")

conn.close()

上下文管理器(自动提交和关闭)

python
import sqlite3

# 使用上下文管理器,自动处理 commit/rollback/close
with sqlite3.connect("data/my_rpa.db") as conn:
    cursor = conn.cursor()
    cursor.execute("INSERT INTO tasks (name, status) VALUES (?, ?)", ("新任务", "pending"))
    # 退出 with 块时自动 commit

SQLite 适用场景

  • RPA 自动化中的本地数据记录(日志、任务状态、配置)
  • 小到中型 Web 应用的数据层
  • 原型开发阶段快速验证数据模型
  • 单用户桌面应用的数据存储

5.3 SQLAlchemy —— Python 的 Prisma

SQLAlchemy 是 Python 最成熟的 ORM(对象关系映射)库,让你用操作对象的方式替代手写 SQL。对于熟悉 Prisma 或 TypeORM 的前端开发者,概念几乎一致。

安装

bash
pip install sqlalchemy

声明式模型(Declarative Base)

python
"""SQLAlchemy ORM 示例 - 用对象操作数据库.

对比 TypeORM/Prisma:
  - declarative_base() ≈ Prisma Schema 的模型定义
  - Session ≈ Prisma Client 的查询会话
  - engine ≈ 数据库连接池
"""

from sqlalchemy import create_engine, Column, Integer, String, DateTime, func
from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime

# 1. 创建引擎(连接数据库)
# SQLite 示例:echo=True 会打印所有 SQL 语句(调试用)
engine = create_engine("sqlite:///data/rpa_orm.db", echo=False)

# 2. 声明基类(类似 Prisma 的模型基类)
Base = declarative_base()

# 3. 定义模型(类似 Prisma Schema)
class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String, nullable=False)
    status = Column(String, default="pending")
    priority = Column(Integer, default=1)
    created_at = Column(DateTime, default=datetime.now)
    completed_at = Column(DateTime, nullable=True)

    def __repr__(self):
        return f"<Task(id={self.id}, name='{self.name}', status='{self.status}')>"

class EmailLog(Base):
    __tablename__ = "email_logs"

    id = Column(Integer, primary_key=True)
    recipient = Column(String, nullable=False)
    subject = Column(String)
    sent_at = Column(DateTime, default=datetime.now)
    status = Column(String, default="sent")  # sent, failed, pending

# 4. 创建所有表(类似 prisma migrate dev)
Base.metadata.create_all(engine)

# 5. 创建会话工厂(类似 Prisma Client 实例)
SessionLocal = sessionmaker(bind=engine)

CRUD 操作

python
# 创建会话
session = SessionLocal()

# ========== 创建(Create)==========
new_task = Task(name="爬取竞品价格", priority=2)
session.add(new_task)
session.commit()  # 提交事务
session.refresh(new_task)  # 刷新以获取自动生成的 ID
print(f"创建任务: {new_task.id}")

# 批量创建
tasks_to_add = [
    Task(name="生成周报", priority=1),
    Task(name="数据清洗", priority=3),
]
session.add_all(tasks_to_add)
session.commit()

# ========== 查询(Read)==========
# 查询所有
all_tasks = session.query(Task).all()

# 条件查询(类似 Prisma 的 where)
pending_tasks = session.query(Task).filter(Task.status == "pending").all()

# 多条件 + 排序 + 限制(类似 Prisma 的 findMany)
high_priority = (
    session.query(Task)
    .filter(Task.status == "pending", Task.priority >= 2)
    .order_by(Task.priority.desc())
    .limit(5)
    .all()
)

# 查询单条(类似 Prisma 的 findUnique)
task = session.query(Task).filter_by(id=1).first()
if task:
    print(f"找到任务: {task.name}")

# 计数(类似 Prisma 的 count)
count = session.query(Task).filter(Task.status == "completed").count()

# ========== 更新(Update)==========
task_to_update = session.query(Task).filter_by(name="爬取竞品价格").first()
if task_to_update:
    task_to_update.status = "completed"
    task_to_update.completed_at = datetime.now()
    session.commit()

# ========== 删除(Delete)==========
task_to_delete = session.query(Task).filter_by(status="completed").first()
if task_to_delete:
    session.delete(task_to_delete)
    session.commit()

session.close()

关系定义(一对多 / 多对一)

python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Project(Base):
    __tablename__ = "projects"

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    description = Column(String)

    # 定义一对多关系:一个项目有多个任务
    # 参数说明:
    #   "Task" - 关联的模型类名
    #   back_populates="project" - Task 模型中对应的反向关系属性名
    tasks = relationship("Task", back_populates="project")

class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    project_id = Column(Integer, ForeignKey("projects.id"), nullable=True)

    # 多对一关系:每个任务属于一个项目
    project = relationship("Project", back_populates="tasks")

# 使用关系
session = SessionLocal()
proj = Project(name="RPA 办公自动化", description="Q4 自动化项目")
t1 = Task(name="自动发邮件", project=proj)
t2 = Task(name="生成报表", project=proj)

session.add_all([proj, t1, t2])
session.commit()

# 通过关系属性访问
print(f"项目 '{proj.name}' 的任务数: {len(proj.tasks)}")
print(f"任务 '{t1.name}' 所属项目: {t1.project.name}")

session.close()

前端类比

SQLAlchemyPrisma / TypeORM
declarative_base() + 模型类schema.prisma 中的 model 定义
session.add(obj)prisma.model.create()
session.query(Model).filter(...)prisma.model.findMany({ where: {...} })
relationship(...)模型间的 @relation 字段
session.commit()自动提交或显式 $transaction

5.4 pandas + SQL —— 数据清洗与入库

RPA 场景中常需要:读取 Excel → 清洗数据 → 存入数据库。pandas 提供了无缝的 SQL 交互能力。

安装

bash
pip install pandas sqlalchemy

DataFrame 与数据库互转

python
"""pandas + SQL 交互 - RPA 数据处理的常见链路.

典型工作流:
  Excel/CSV → pandas DataFrame → 清洗 → 存入 SQLite → 查询分析
"""

import pandas as pd
from sqlalchemy import create_engine

# 创建连接引擎
engine = create_engine("sqlite:///data/sales.db")

# ========== DataFrame 写入数据库 ==========
# 创建示例数据(实际中可能来自 Excel、API 或爬虫)
df = pd.DataFrame({
    "product": ["iPhone 15", "MacBook Pro", "AirPods Pro", "iPhone 15"],
    "region": ["华东", "华北", "华东", "华南"],
    "quantity": [100, 50, 200, 80],
    "price": [5999.0, 14999.0, 1899.0, 5999.0],
    "date": pd.to_datetime(["2024-01-15", "2024-01-16", "2024-01-17", "2024-01-18"]),
})

# 写入数据库(自动创建表,if_exists='replace' 覆盖 / 'append' 追加)
df.to_sql("sales", engine, index=False, if_exists="replace")
print("数据已写入 sales 表")

# ========== 从数据库读取到 DataFrame ==========
# 读取整张表
df_from_db = pd.read_sql("SELECT * FROM sales", engine)
print(df_from_db)

# 读取时直接执行 SQL 查询(聚合、过滤在数据库层完成)
summary = pd.read_sql("""
    SELECT
        product,
        SUM(quantity) as total_qty,
        ROUND(AVG(price), 2) as avg_price,
        SUM(quantity * price) as revenue
    FROM sales
    GROUP BY product
    ORDER BY revenue DESC
""", engine)
print("\n产品销售汇总:")
print(summary)

# ========== 读取 Excel → 清洗 → 入库 ==========
# 假设从 Excel 读取了原始数据
# raw_df = pd.read_excel("raw_sales.xlsx")

# 清洗示例
cleaned_df = df.copy()
cleaned_df["revenue"] = cleaned_df["quantity"] * cleaned_df["price"]
cleaned_df = cleaned_df[cleaned_df["quantity"] > 0]  # 过滤异常值
cleaned_df["region"] = cleaned_df["region"].str.strip()  # 去除空格

# 清洗后入库
cleaned_df.to_sql("sales_cleaned", engine, index=False, if_exists="replace")
print(f"\n清洗后入库 {len(cleaned_df)} 条记录")

数据库迁移简单方案

对于 RPA 脚本和小型项目,可以使用 alembic 进行数据库迁移管理:

bash
# 安装 alembic
pip install alembic

# 初始化迁移环境(类似 prisma migrate dev 的初始化)
alembic init alembic

# 修改 alembic.ini 中的数据库连接字符串
# 修改 alembic/env.py 导入你的 Base

# 创建迁移脚本
alembic revision --autogenerate -m "create tasks table"

# 执行迁移
alembic upgrade head

何时使用 ORM vs 原生 SQL?

  • 原生 SQLite:简单脚本、一次性任务、学习阶段
  • SQLAlchemy ORM:中大型项目、多表关联、团队协作、需要类型安全
  • pandas + SQL:数据清洗、分析、报表生成、与 Excel 互操作

5.5 RPA 场景实战:任务调度日志系统

结合本章所学,构建一个 RPA 任务调度日志系统——记录每次自动化任务的执行时间、状态和结果。

python
"""RPA 任务调度日志系统 - 综合运用 SQLite + SQLAlchemy.

功能:
  - 注册自动化任务
  - 记录每次执行日志(开始时间、结束时间、状态、错误信息)
  - 查询任务成功率统计
"""

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Float, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from datetime import datetime
from contextlib import contextmanager

engine = create_engine("sqlite:///data/rpa_scheduler.db", echo=False)
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)


class RpaTask(Base):
    """自动化任务定义"""
    __tablename__ = "rpa_tasks"

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False, unique=True)
    description = Column(Text)
    schedule = Column(String)  # cron 表达式或描述
    enabled = Column(Integer, default=1)  # 1=启用, 0=禁用

    logs = relationship("TaskLog", back_populates="task", cascade="all, delete-orphan")


class TaskLog(Base):
    """任务执行日志"""
    __tablename__ = "task_logs"

    id = Column(Integer, primary_key=True)
    task_id = Column(Integer, ForeignKey("rpa_tasks.id"), nullable=False)
    started_at = Column(DateTime, default=datetime.now)
    finished_at = Column(DateTime, nullable=True)
    status = Column(String, default="running")  # running, success, failed
    error_message = Column(Text, nullable=True)
    duration_ms = Column(Integer, nullable=True)

    task = relationship("RpaTask", back_populates="logs")


# 创建表
Base.metadata.create_all(engine)


@contextmanager
def get_session():
    """上下文管理器,自动管理 Session 生命周期."""
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()


def register_task(name: str, description: str, schedule: str = "") -> int:
    """注册一个新任务."""
    with get_session() as session:
        task = RpaTask(name=name, description=description, schedule=schedule)
        session.add(task)
        session.flush()  # 获取生成的 ID
        return task.id


def start_log(task_id: int) -> int:
    """记录任务开始,返回日志 ID."""
    with get_session() as session:
        log = TaskLog(task_id=task_id, status="running")
        session.add(log)
        session.flush()
        return log.id


def finish_log(log_id: int, status: str = "success", error: str = ""):
    """记录任务结束."""
    with get_session() as session:
        log = session.query(TaskLog).filter_by(id=log_id).first()
        if log:
            log.finished_at = datetime.now()
            log.status = status
            log.error_message = error if error else None
            if log.started_at:
                delta = log.finished_at - log.started_at
                log.duration_ms = int(delta.total_seconds() * 1000)


def get_task_stats(task_id: int | None = None) -> dict:
    """查询任务执行统计."""
    with get_session() as session:
        query = session.query(TaskLog)
        if task_id:
            query = query.filter_by(task_id=task_id)

        logs = query.all()
        total = len(logs)
        success = sum(1 for l in logs if l.status == "success")
        failed = sum(1 for l in logs if l.status == "failed")
        avg_duration = sum(l.duration_ms for l in logs if l.duration_ms) / max(success, 1)

        return {
            "total_runs": total,
            "success": success,
            "failed": failed,
            "success_rate": f"{success / max(total, 1) * 100:.1f}%",
            "avg_duration_ms": round(avg_duration, 1),
        }


# ========== 使用示例 ==========
if __name__ == "__main__":
    # 注册任务
    task_id = register_task(
        name="daily_email_report",
        description="每日自动发送邮件报表",
        schedule="0 9 * * *"
    )
    print(f"任务已注册,ID: {task_id}")

    # 模拟执行
    log_id = start_log(task_id)
    print(f"任务开始,日志 ID: {log_id}")

    # ... 这里执行实际的 RPA 逻辑 ...

    finish_log(log_id, status="success")
    print("任务完成")

    # 查看统计
    stats = get_task_stats(task_id)
    print(f"\n任务统计: {stats}")

本章小结

在本章中,你学习了:

  • SQLite:零配置的本地数据库,适合 RPA 数据持久化
  • SQLAlchemy ORM:用 Python 对象操作数据库,类似 Prisma/TypeORM
  • pandas + SQL:数据清洗、分析与数据库的高效互操作
  • 实战:构建了一个 RPA 任务调度日志系统

这些技能是后续网络请求数据存储、RPA 报表生成和 Web 后端开发的必备基础。

从前端到 Python 开发者的进化之路