一、業務目標 & 前提假設
業務目標
• 支持 PDF OCR(多頁)和 圖片 OCR
• 任務耗時可能較長(幾十秒~幾分鐘)
• 要求:
• 支持大量併發任務,不會把 FastAPI 頂死
• 支持重試(雲 OCR 抖一下不要直接失敗)
• 支持服務重啓後任務可恢復(至少未執行/掛一半的任務還能補償)
• 支持任務狀態查詢(PENDING/RUNNING/SUCCESS/FAILED/進度)
前提假設
• 技術棧:FastAPI + ARQ + Redis + Postgres + 對象存儲(本地或 MinIO/OSS)
• OCR 方式:
• 可以是 雲 OCR API(百度/阿里/騰訊)——IO 密集,非常適合 async
• 或者 本地 OCR 服務(例如 PaddleOCR 獨立服務),ARQ 只負責調服務
關鍵點:真正重 CPU/GPU 的推理最好在獨立的推理服務裏跑,ARQ 更適合作為“編排 + IO 請求調度”。
⸻
二、基於 ARQ 的整體架構設計
- 組件劃分
- FastAPI 服務(api-service)
• 提供 HTTP API:
• POST /ocr/tasks:上傳文件 / 提交任務,返回 task_id
• GET /ocr/tasks/{task_id}:查詢任務狀態+進度+結果摘要
• 負責:
• 文件接收 & 存儲(寫到對象存儲/本地磁盤)
• 創建 DB 記錄(任務 & 文檔 & 頁)
• 把任務扔進 ARQ 隊列(只傳 ID,不傳大文件) - Redis
• ARQ 的隊列 + 任務結果存儲
• 只存少量任務參數 / 狀態,不存大文本(避免 Redis 爆) - ARQ Worker(ocr-worker)
• 使用 arq worker.WorkerSettings 啓動
• 核心任務:
• ocr_document(doc_id, retry_count=0)
• 內部:拆頁 → 併發調用 OCR → 存 DB → 更新進度 → 合併結果
• 任務函數全部使用 async def,適配雲 OCR / HTTP 調用場景 - Postgres
• 存任務狀態 & 結果:
• ocr_task 表:任務級別(PDF/圖片)
• ocr_page 表:按頁存儲識別結果
• 提供數據持久化,保證重啓後不會丟結果 - 對象存儲 / 本地文件系統
• 存原始 PDF/圖片 + 拆頁後的中間圖片(如果有)
⸻
- 任務處理流程(以 PDF 為例)
- 提交任務(FastAPI)
• 用户上傳 PDF
• API 做的事情: - 保存文件到存儲,得到 file_path 或 file_key
- 在 ocr_task 表插一條記錄:
• task_id
• file_path
• status = PENDING
• progress = 0 - 通過 ARQ 入隊:
job = await redis_pool.enqueue_job(
"ocr_document",
task_id,
retry_count=0,
)
4. 返回 task_id 給前端
2. Worker 側:ocr_document 任務邏輯
async def ocr_document(ctx, task_id: str, retry_count: int = 0):
db = ctx["db"] # 啓動時注入
try:
# 1. 更新任務狀態為 RUNNING
await db.update_task_status(task_id, "RUNNING")
# 2. 根據 task_id 查出 file_path,判斷是 PDF 還是圖片
task = await db.get_task(task_id)
file_path = task.file_path
if task.file_type == "pdf":
# 2.1 拆 PDF 為多頁圖片
page_paths = await split_pdf_to_images(file_path)
else:
page_paths = [file_path]
total = len(page_paths)
results = []
# 3. 控制併發調用 OCR(雲 OCR / 本地 OCR 服務)
sem = asyncio.Semaphore(5) # 限制同時請求數
async def ocr_one(i, page_path):
async with sem:
text, extra = await call_ocr_api(page_path)
await db.save_page_result(task_id, i, text, extra)
# 更新進度
await db.update_task_progress(task_id, int((i+1) / total * 100))
await asyncio.gather(*[
ocr_one(i, p) for i, p in enumerate(page_paths)
])
# 4. 合併結果/做後處理(可選)
await db.mark_task_success(task_id)
except TemporaryError as e:
# 自定義的“暫時性錯誤”,比如網絡/雲服務 5xx
MAX_RETRY = 3
if retry_count < MAX_RETRY:
# 10 秒後重試,並帶上 retry_count+1
from arq import Retry
raise Retry(defer=10, kwargs={
“task_id”: task_id,
“retry_count”: retry_count + 1
})
else:
await db.mark_task_failed(task_id, reason=str(e))
raise
except Exception as e:
# 其他不可恢復錯誤
await db.mark_task_failed(task_id, reason=str(e))
raise
3. 查詢任務結果
• GET /ocr/tasks/{task_id} 從 Postgres 讀:
• status
• progress
• 如果成功:可以返回文本摘要 / 頁數 / 下載鏈接
⸻
- 宕機 & 重啓時的恢復策略
1)Redis 隊列裏的任務
• 未開始執行的任務都在 Redis 裏
• 只要 Redis 沒掛(開啓 AOF 或持久化),重啓 worker 後會繼續執行
2)執行中的任務(RUNNING)
• 配置 job_timeout,比如 10 分鐘:
class WorkerSettings:
functions = [ocr_document]
redis_settings = RedisSettings(...)
job_timeout = 600
• 如果 worker 崩掉 / kill -9:
• Redis 認為這個 job 處於執行中,但 job_timeout 到期後會判定為失敗
• 我們的補償策略:
• 在 ocr_task 中維護 last_update_time(每處理一頁更新一次)
• 啓一個“巡檢任務”(可以是另一個定時腳本 / 服務):
• 定期掃描 status=RUNNING 且 last_update_time 超過 N 分鐘的任務
• 判斷為“疑似殭屍任務”
• 再次通過 ARQ enqueue_job("ocr_document", task_id, retry_count=當前+1)
這樣就實現了:
• 服務優雅關閉:worker 會把手上的任務跑完再退出
• 服務異常宕機:通過 job_timeout + last_update_time 把“半途掛掉”的任務重新入隊
⸻
三、使用 ARQ 做這類業務的優點
- 和 FastAPI 風格統一:全鏈路 async
• FastAPI 本身是 async 框架
• ARQ 的任務函數也是 async def,調用雲 OCR、對象存儲、DB 都是 await
• 整個項目是純 async 風格,思維模型一致,協程調度簡單清晰 - 對雲 OCR / HTTP IO 場景特別友好
• OCR 如果是走雲廠商 API,本地主要是網絡 IO + 等待時間
• 使用 ARQ + asyncio.gather 可以輕鬆做到:
• 一個 worker 同時跑多個 OCR 請求
• 控制併發(Semaphore)避免打爆雲服務 QPS
• CPU 不重的情況下:這種 async 併發非常高效 - 架構簡單、組件少
• 只需要 Redis(既做隊列又存 job 狀態)
• 對比 Celery:
• 無需 RabbitMQ / 額外 backend
• Worker 配置簡單,一個 WorkerSettings 就夠
對於你這種自己掌控部署、還要搞一堆微服務的人來説,少一個組件就少一堆運維心智負擔。
- 重試機制可按業務精細控制
• 用 Retry(defer=秒數, kwargs=…) 明確告訴 ARQ“過多久再重試”
• 很適合 OCR 裏這種“雲接口暫時 500/超時,再試幾次”的場景
• 你可以在任務中設計:
• 最大重試次數
• 重試間隔(固定/遞增)
• 哪些異常重試,哪些異常直接失敗
• 完全業務驅動,不被框架的黑魔法限制 - 適合“調度+編排”,而不是“重推理”
• 你本來就打算把 PaddleOCR / 大模型等重推理部分單獨做服務:
• ARQ 負責:排隊 → 調 OCR 服務 → 存結果 → 更新進度
• OCR 服務只負責推理
• 在這個定位下,ARQ 非常合適當“業務編排層”的隊列框架
⸻
四、使用 ARQ 的不足 / 風險點
- 僅支持 Redis,擴展性受限
• ARQ 目前只支持 Redis 作為隊列和結果存儲
• 如果你將來希望:
• 使用 RabbitMQ / Kafka / SQS 等更“重量級”的消息系統
• 或者需要更強的持久化語義 / 消息重放
• 那 ARQ 就不適合,需要換框架(例如 Celery 或自己對接 Kafka)
對你目前來説,Redis 足夠,但這是個中長期的約束。
- 沒有內置類似 Celery beat 的定時調度器
• ARQ 沒有像 Celery beat 那樣的“任務調度器”
• 如果你要:
• 定期掃描殭屍任務
• 定時批量做 OCR 任務
• 需要:
• 用 crontab / APScheduler / 一個小的 FastAPI 定時服務來自行實現
不是不能做,就是需要你自己寫一點調度邏輯。
- 重試策略需要自己封裝“標準化”
• ARQ 只提供一個 Retry 異常
• “最大重試次數、退避策略、統一日誌記錄”都需要你自己封裝一個小工具層
• 對你來講不難,但團隊協作時要保證所有任務遵循同一套規範
Celery 這塊有比較完整的官方支持(max_retries, countdown, retry_backoff 等)。
- 可視化監控和生態偏弱
• Celery 有 Flower,還有無數經驗博客
• ARQ 的生態比較“極客”,可視化監控需要你自己接:
• Prometheus + Grafana
• 自寫管理接口(比如列出任務狀態、處理速度等)
• 對你這種本來就要搭日誌/監控體系的人來説問題不大,但不如 Celery 開箱即用。 - 對 CPU/GPU 密集任務不是最優形態
• ARQ 是 async 單進程事件循環模型,要充分利用多核/多 GPU,需要:
• 啓動多個 worker 進程 / 容器
• 或把重 CPU 邏輯放到其他服務(推薦)
• Celery 的多進程 worker 模型在直接跑本地推理時更自然一些
對你的場景:推薦把重推理獨立服務化,ARQ 做調度,這個缺點就不算大問題。
- Redis 任務持久化要自己注意配置
• 如果 Redis 配置不好(比如純內存、沒有 AOF/RDB),崩潰時隊列裏的任務會丟
• ARQ 自己不管這些,需要你在 Redis 層:
• 開啓 RDB/AOF
• 做主從/哨兵(高可用)
不過這點不管 Celery/ARQ 都一樣:broker 崩了都得你自己兜底。
⸻
五、結合你當前業務的建議結論
如果我們只看你現在這條線:
• PDF / 圖片 OCR
• 很多調用雲 OCR、未來還要調智能編目、質檢等服務
• 有 FastAPI、Redis、Postgres 的基礎
• 你能接受自己封裝一層“任務重試 + 狀態管理 + 監控”
那麼:
✅ 用 ARQ 做“異步任務 & 編排層”是可行且好用的選擇,尤其是對於 IO 型任務(雲 OCR)很合適。
⚠️ 但前提是:
• 真正重推理(PaddleOCR / 大模型)放到獨立推理服務
• ARQ + Redis 只存 ID & 狀態,結果進 DB
• 你願意自己寫一點:重試封裝、殭屍任務恢復、監控接口。
如果你後面打算把這一套做成“全公司統一任務中台”,還要承載各種類型的任務(視頻轉碼、大模型推理等等),那可以:
• 當前 OCR 項目用 ARQ(輕便、開發快)
• 並並行規劃一套 更通用的 Celery 任務平台 作為長遠演進方向(甚至可以共存一段時間)