WayToClawEarn
进阶30 分钟2026年5月29日

PostgreSQL 持久化工作流实战:30 分钟搭建任务编排系统

用 PostgreSQL 替代 Temporal/Airflow,30 分钟搭建轻量级持久化工作流引擎

进阶 · 30 分钟 · 2026年5月29日

教程目标

在 30 分钟内,用 PostgreSQL 搭建一个轻量级持久化工作流引擎。不需要额外部署 Temporal、Airflow 等重型编排系统——PostgreSQL 本身就是编排器。

你将学会什么

  • 用 PostgreSQL 表实现工作流状态持久化
  • SELECT ... FOR UPDATE SKIP LOCKED 实现多 Worker 任务分发
  • 实现断点续跑和 Worker 崩溃自动恢复
  • 用 SQL 查询直接获取工作流运行状态,无需专属 Dashboard

准备清单

  • PostgreSQL 14+(本地安装,或用 Neon / Supabase 云服务,免费版即可)
  • Python 3.9+(示例用 psycopg2,换成 Node.js/Go 同理)
  • 一个需要可靠异步执行的任务场景

总体架构

传统工作流引擎(Temporal、Airflow、AWS Step Functions)依赖「中央编排器 + Worker 池」两层架构。编排器负责接收任务、分配 Worker、记录状态、处理故障转移。这套架构功能强大,但运维成本高——需要额外部署和维护编排器集群。

PostgreSQL 方案把编排逻辑「下沉」到数据库层:应用服务器不再通过中间人通信,而是直接读写 Postgres 表来完成任务的取用、执行和状态更新。

组件传统方案PostgreSQL 方案
任务队列Redis / RabbitMQPostgres 表 + SKIP LOCKED
状态存储编排器内部 KV 存储Postgres 行级数据
Worker 发现编排器主动分配Worker 轮询 + 行锁竞争
故障恢复编排器心跳检测 → 重分配Worker 读检查点 → 跳过已完成步骤
可观性专属 Web DashboardSQL 查询,可与现有监控工具集成

数据库驱动的工作流引擎架构对比

第 1 步:创建工作流状态表(5 分钟)

设计两张核心表:workflows 记录工作流实例的全局状态,workflow_steps 记录每一步的执行历史——这张表就是你的检查点存储。

sql
-- 工作流实例表
CREATE TABLE workflows (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_type VARCHAR(100) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    input_data JSONB,
    current_step INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3,
    error_message TEXT,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- 步骤执行记录(检查点)
CREATE TABLE workflow_steps (
    id SERIAL PRIMARY KEY,
    workflow_id UUID REFERENCES workflows(id),
    step_number INTEGER NOT NULL,
    step_name VARCHAR(200),
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    output_data JSONB,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    error_message TEXT,
    UNIQUE(workflow_id, step_number)
);

-- 索引:加速 Worker 轮询
CREATE INDEX idx_workflows_status ON workflows(status, created_at);
CREATE INDEX idx_workflow_steps_lookup ON workflow_steps(workflow_id, step_number);

关键设计UNIQUE(workflow_id, step_number) 约束是整个系统的安全保障。如果两个 Worker 同时尝试写入同一步骤的结果,PostgreSQL 会让后到达的事务直接失败(违反唯一约束),从而保证每个步骤只被执行一次。

第 2 步:实现多 Worker 任务分发(10 分钟)

核心机制只有一条 SQL:SELECT ... FOR UPDATE SKIP LOCKED。它做了三件事:

  1. 锁定一条 pending 状态的工作流行
  2. 自动跳过已被其他 Worker 锁定的行(SKIP LOCKED
  3. 保证每条工作流全局只被一个 Worker 取走
python
import psycopg2
import json

def dequeue_workflow(conn):
    """从 Postgres 表中取出一条待执行的工作流"""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT id, workflow_type, input_data, current_step
            FROM workflows
            WHERE status = 'pending'
            ORDER BY created_at
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        """)
        row = cur.fetchone()
        if row:
            cur.execute(
                "UPDATE workflows SET status = 'running', updated_at = now() WHERE id = %s",
                (row[0],)
            )
            conn.commit()
            return {
                "id": row[0],
                "type": row[1],
                "input": row[2],
                "step": row[3]
            }
    conn.rollback()
    return None

PostgreSQL 行锁与 SKIP LOCKED 机制示意

SKIP LOCKED 是 PostgreSQL 9.5 引入的特性,它的出现让数据库可以直接充当消息队列——不需要 Redis、不需要 RabbitMQ、不需要额外的中间件。这就是「PostgreSQL 就是编排器」的核心技术基础。

第 3 步:实现检查点与故障恢复(10 分钟)

每个步骤执行完成后,将结果写入 workflow_steps 表。如果 Worker 在执行中途崩溃,下一个接手该工作流的 Worker 会读取已有的检查点记录,跳过已完成的步骤,从断点继续执行。

python
def execute_step(conn, workflow_id, step_number, step_fn, step_name=""):
    """执行一个步骤,自动处理跳过已完成的步骤和故障恢复"""

# 先检查该步骤是否已完成(故障恢复的关键)
    with conn.cursor() as cur:
        cur.execute(
            """SELECT status, output_data FROM workflow_steps
               WHERE workflow_id = %s AND step_number = %s""",
            (workflow_id, step_number)
        )
        existing = cur.fetchone()
        if existing and existing[0] == 'completed':
            return existing[1]  # 已完成的步骤直接返回,零开销跳过

# 创建步骤记录(ON CONFLICT DO NOTHING 防止并发重复)
    with conn.cursor() as cur:
        cur.execute(
            """INSERT INTO workflow_steps (workflow_id, step_number, step_name, status, started_at)
               VALUES (%s, %s, %s, 'running', now())
               ON CONFLICT (workflow_id, step_number) DO NOTHING""",
            (workflow_id, step_number, step_name)
        )
        conn.commit()

    try:
        result = step_fn()
        with conn.cursor() as cur:
            cur.execute(
                """UPDATE workflow_steps
                   SET status = 'completed', output_data = %s, completed_at = now()
                   WHERE workflow_id = %s AND step_number = %s""",
                (json.dumps(result), workflow_id, step_number)
            )
            cur.execute(
                "UPDATE workflows SET current_step = %s, updated_at = now() WHERE id = %s",
                (step_number, workflow_id)
            )
        conn.commit()
        return result
    except Exception as e:
        with conn.cursor() as cur:
            cur.execute(
                """UPDATE workflow_steps
                   SET status = 'failed', error_message = %s, completed_at = now()
                   WHERE workflow_id = %s AND step_number = %s""",
                (str(e), workflow_id, step_number)
            )
        conn.commit()
        raise
场景传统编排器PostgreSQL 方案
Worker 崩溃编排器检测心跳超时 → 重新分配任务下一个 Worker 读检查点 → 跳过已完成步骤继续
并发冲突编排器保证单 Worker 执行SKIP LOCKED + UNIQUE 约束双重保护
步骤超时需在编排器单独配置Python 层设 timeout,超时标记为 failed
水平扩展需配置 Worker 注册与发现直接加 Worker 进程,每进程独立轮询 Postgres

第 4 步:SQL 即可观性(5 分钟)

工作流的所有状态都存在 Postgres 表中,这意味着 SQL 就是你的监控 Dashboard——不需要 Prometheus、不需要 Grafana。

sql
-- 查看 24 小时内各类型工作流的成功率
SELECT
    workflow_type,
    COUNT(*) AS total,
    COUNT(*) FILTER (WHERE status = 'completed') AS completed,
    ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'completed') / COUNT(*), 1) AS success_pct
FROM workflows
WHERE created_at > now() - INTERVAL '24 hours'
GROUP BY workflow_type;

-- 找出卡住超过 5 分钟的工作流(可能是 Worker 崩溃未恢复)
SELECT id, workflow_type, current_step, updated_at
FROM workflows
WHERE status = 'running'
  AND updated_at < now() - INTERVAL '5 minutes';

-- 分析各步骤平均执行时长,找出瓶颈步骤
SELECT
    w.workflow_type,
    ws.step_name,
    ROUND(AVG(EXTRACT(EPOCH FROM (ws.completed_at - ws.started_at)))::numeric, 2) AS avg_seconds
FROM workflow_steps ws
JOIN workflows w ON w.id = ws.workflow_id
WHERE ws.status = 'completed'
GROUP BY w.workflow_type, ws.step_name
ORDER BY avg_seconds DESC;

SQL 的可观性带来了一个额外好处:你可以把工作流监控查询嵌入现有的告警系统(比如定时跑 SELECT ... WHERE status = 'running' AND updated_at < now() - INTERVAL '5 minutes',有结果就发告警)。

常见问题排查(FAQ)

Q1:和 n8n / Temporal 有什么区别,什么时候选 PostgreSQL?

n8n 适合非开发者做 API 编排,Temporal 适合超大规模分布式事务场景。PostgreSQL 方案适合「项目已经有 Postgres,不想为工作流引擎再引入一套基础设施」的场景。如果你在做一个 SaaS 产品,后端已经有 Postgres,这套方案零额外运维成本。

Q2:并发量大时单机 Postgres 能扛住吗?

一台 8 vCPU 的 PostgreSQL 实例可以处理每秒数万条工作流的入队和出队操作。如果将来需要更大规模,可以用 CockroachDB(兼容 PostgreSQL 协议)做分布式扩展,或者用 Citus 做分片——应用层代码不需要改变。

Q3:已经用了 Redis 做队列,还需要迁移吗?

不需要。PostgreSQL 方案解决的是「工作流需要状态持久化和故障恢复」的场景,而不是取代 Redis 做简单的消息投递。如果你的任务就是 fire-and-forget(发完就不用管),Redis 够用。如果任务有 5 个步骤、每个步骤可能失败需要重试、中间状态需要保存——那就是 PostgreSQL 主场。

工具词条

本文涉及的工具和平台:PostgreSQL(核心引擎)、Neon(Serverless Postgres,适合免运维起步)、Supabase(托管 Postgres + API 层)、n8n(业务用户可视化编排替代方案)、DBOS(基于本文思路的企业级框架,提供 SDK 封装)。

内链引导

免责声明:本站案例均为知识分享内容,仅供灵感与参考,不构成收益承诺;由此进行的外部执行与结果请自行判断并承担相应责任。

相关推荐