PostgreSQL 持久化工作流实战:30 分钟搭建任务编排系统
用 PostgreSQL 替代 Temporal/Airflow,30 分钟搭建轻量级持久化工作流引擎
进阶 · 30 分钟 · 2026年5月29日
教程目标
在 30 分钟内,用 PostgreSQL 搭建一个轻量级持久化工作流引擎。不需要额外部署 Temporal、Airflow 等重型编排系统——PostgreSQL 本身就是编排器。
你将学会什么
- 用 PostgreSQL 表实现工作流状态持久化
- 用
SELECT ... FOR UPDATE SKIP LOCKED实现多 Worker 任务分发 - 实现断点续跑和 Worker 崩溃自动恢复
- 用 SQL 查询直接获取工作流运行状态,无需专属 Dashboard
准备清单
- PostgreSQL 14+(本地安装,或用 Neon / Supabase 云服务,免费版即可)
- Python 3.9+(示例用 psycopg2,换成 Node.js/Go 同理)
- 一个需要可靠异步执行的任务场景
总体架构
传统工作流引擎(Temporal、Airflow、AWS Step Functions)依赖「中央编排器 + Worker 池」两层架构。编排器负责接收任务、分配 Worker、记录状态、处理故障转移。这套架构功能强大,但运维成本高——需要额外部署和维护编排器集群。
PostgreSQL 方案把编排逻辑「下沉」到数据库层:应用服务器不再通过中间人通信,而是直接读写 Postgres 表来完成任务的取用、执行和状态更新。
| 组件 | 传统方案 | PostgreSQL 方案 |
|---|---|---|
| 任务队列 | Redis / RabbitMQ | Postgres 表 + SKIP LOCKED |
| 状态存储 | 编排器内部 KV 存储 | Postgres 行级数据 |
| Worker 发现 | 编排器主动分配 | Worker 轮询 + 行锁竞争 |
| 故障恢复 | 编排器心跳检测 → 重分配 | Worker 读检查点 → 跳过已完成步骤 |
| 可观性 | 专属 Web Dashboard | SQL 查询,可与现有监控工具集成 |
第 1 步:创建工作流状态表(5 分钟)
设计两张核心表:workflows 记录工作流实例的全局状态,workflow_steps 记录每一步的执行历史——这张表就是你的检查点存储。
-- 工作流实例表
CREATE TABLE workflows (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_type VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
input_data JSONB,
current_step INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
error_message TEXT,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- 步骤执行记录(检查点)
CREATE TABLE workflow_steps (
id SERIAL PRIMARY KEY,
workflow_id UUID REFERENCES workflows(id),
step_number INTEGER NOT NULL,
step_name VARCHAR(200),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
output_data JSONB,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_message TEXT,
UNIQUE(workflow_id, step_number)
);
-- 索引:加速 Worker 轮询
CREATE INDEX idx_workflows_status ON workflows(status, created_at);
CREATE INDEX idx_workflow_steps_lookup ON workflow_steps(workflow_id, step_number);关键设计:
UNIQUE(workflow_id, step_number)约束是整个系统的安全保障。如果两个 Worker 同时尝试写入同一步骤的结果,PostgreSQL 会让后到达的事务直接失败(违反唯一约束),从而保证每个步骤只被执行一次。
第 2 步:实现多 Worker 任务分发(10 分钟)
核心机制只有一条 SQL:SELECT ... FOR UPDATE SKIP LOCKED。它做了三件事:
- 锁定一条
pending状态的工作流行 - 自动跳过已被其他 Worker 锁定的行(
SKIP LOCKED) - 保证每条工作流全局只被一个 Worker 取走
import psycopg2
import json
def dequeue_workflow(conn):
"""从 Postgres 表中取出一条待执行的工作流"""
with conn.cursor() as cur:
cur.execute("""
SELECT id, workflow_type, input_data, current_step
FROM workflows
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
row = cur.fetchone()
if row:
cur.execute(
"UPDATE workflows SET status = 'running', updated_at = now() WHERE id = %s",
(row[0],)
)
conn.commit()
return {
"id": row[0],
"type": row[1],
"input": row[2],
"step": row[3]
}
conn.rollback()
return NoneSKIP LOCKED 是 PostgreSQL 9.5 引入的特性,它的出现让数据库可以直接充当消息队列——不需要 Redis、不需要 RabbitMQ、不需要额外的中间件。这就是「PostgreSQL 就是编排器」的核心技术基础。
第 3 步:实现检查点与故障恢复(10 分钟)
每个步骤执行完成后,将结果写入 workflow_steps 表。如果 Worker 在执行中途崩溃,下一个接手该工作流的 Worker 会读取已有的检查点记录,跳过已完成的步骤,从断点继续执行。
def execute_step(conn, workflow_id, step_number, step_fn, step_name=""):
"""执行一个步骤,自动处理跳过已完成的步骤和故障恢复"""
# 先检查该步骤是否已完成(故障恢复的关键)
with conn.cursor() as cur:
cur.execute(
"""SELECT status, output_data FROM workflow_steps
WHERE workflow_id = %s AND step_number = %s""",
(workflow_id, step_number)
)
existing = cur.fetchone()
if existing and existing[0] == 'completed':
return existing[1] # 已完成的步骤直接返回,零开销跳过
# 创建步骤记录(ON CONFLICT DO NOTHING 防止并发重复)
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO workflow_steps (workflow_id, step_number, step_name, status, started_at)
VALUES (%s, %s, %s, 'running', now())
ON CONFLICT (workflow_id, step_number) DO NOTHING""",
(workflow_id, step_number, step_name)
)
conn.commit()
try:
result = step_fn()
with conn.cursor() as cur:
cur.execute(
"""UPDATE workflow_steps
SET status = 'completed', output_data = %s, completed_at = now()
WHERE workflow_id = %s AND step_number = %s""",
(json.dumps(result), workflow_id, step_number)
)
cur.execute(
"UPDATE workflows SET current_step = %s, updated_at = now() WHERE id = %s",
(step_number, workflow_id)
)
conn.commit()
return result
except Exception as e:
with conn.cursor() as cur:
cur.execute(
"""UPDATE workflow_steps
SET status = 'failed', error_message = %s, completed_at = now()
WHERE workflow_id = %s AND step_number = %s""",
(str(e), workflow_id, step_number)
)
conn.commit()
raise| 场景 | 传统编排器 | PostgreSQL 方案 |
|---|---|---|
| Worker 崩溃 | 编排器检测心跳超时 → 重新分配任务 | 下一个 Worker 读检查点 → 跳过已完成步骤继续 |
| 并发冲突 | 编排器保证单 Worker 执行 | SKIP LOCKED + UNIQUE 约束双重保护 |
| 步骤超时 | 需在编排器单独配置 | Python 层设 timeout,超时标记为 failed |
| 水平扩展 | 需配置 Worker 注册与发现 | 直接加 Worker 进程,每进程独立轮询 Postgres |
第 4 步:SQL 即可观性(5 分钟)
工作流的所有状态都存在 Postgres 表中,这意味着 SQL 就是你的监控 Dashboard——不需要 Prometheus、不需要 Grafana。
-- 查看 24 小时内各类型工作流的成功率
SELECT
workflow_type,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'completed') / COUNT(*), 1) AS success_pct
FROM workflows
WHERE created_at > now() - INTERVAL '24 hours'
GROUP BY workflow_type;
-- 找出卡住超过 5 分钟的工作流(可能是 Worker 崩溃未恢复)
SELECT id, workflow_type, current_step, updated_at
FROM workflows
WHERE status = 'running'
AND updated_at < now() - INTERVAL '5 minutes';
-- 分析各步骤平均执行时长,找出瓶颈步骤
SELECT
w.workflow_type,
ws.step_name,
ROUND(AVG(EXTRACT(EPOCH FROM (ws.completed_at - ws.started_at)))::numeric, 2) AS avg_seconds
FROM workflow_steps ws
JOIN workflows w ON w.id = ws.workflow_id
WHERE ws.status = 'completed'
GROUP BY w.workflow_type, ws.step_name
ORDER BY avg_seconds DESC;SQL 的可观性带来了一个额外好处:你可以把工作流监控查询嵌入现有的告警系统(比如定时跑 SELECT ... WHERE status = 'running' AND updated_at < now() - INTERVAL '5 minutes',有结果就发告警)。
常见问题排查(FAQ)
Q1:和 n8n / Temporal 有什么区别,什么时候选 PostgreSQL?
n8n 适合非开发者做 API 编排,Temporal 适合超大规模分布式事务场景。PostgreSQL 方案适合「项目已经有 Postgres,不想为工作流引擎再引入一套基础设施」的场景。如果你在做一个 SaaS 产品,后端已经有 Postgres,这套方案零额外运维成本。
Q2:并发量大时单机 Postgres 能扛住吗?
一台 8 vCPU 的 PostgreSQL 实例可以处理每秒数万条工作流的入队和出队操作。如果将来需要更大规模,可以用 CockroachDB(兼容 PostgreSQL 协议)做分布式扩展,或者用 Citus 做分片——应用层代码不需要改变。
Q3:已经用了 Redis 做队列,还需要迁移吗?
不需要。PostgreSQL 方案解决的是「工作流需要状态持久化和故障恢复」的场景,而不是取代 Redis 做简单的消息投递。如果你的任务就是 fire-and-forget(发完就不用管),Redis 够用。如果任务有 5 个步骤、每个步骤可能失败需要重试、中间状态需要保存——那就是 PostgreSQL 主场。
工具词条
本文涉及的工具和平台:PostgreSQL(核心引擎)、Neon(Serverless Postgres,适合免运维起步)、Supabase(托管 Postgres + API 层)、n8n(业务用户可视化编排替代方案)、DBOS(基于本文思路的企业级框架,提供 SDK 封装)。