一、業務目標 & 前提假設

業務目標
• 支持 PDF OCR(多頁)和 圖片 OCR
• 任務耗時可能較長(幾十秒~幾分鐘)
• 要求:
• 支持大量併發任務,不會把 FastAPI 頂死
• 支持重試(雲 OCR 抖一下不要直接失敗)
• 支持服務重啓後任務可恢復(至少未執行/掛一半的任務還能補償)
• 支持任務狀態查詢(PENDING/RUNNING/SUCCESS/FAILED/進度)

前提假設
• 技術棧:FastAPI + ARQ + Redis + Postgres + 對象存儲(本地或 MinIO/OSS)
• OCR 方式:
• 可以是 雲 OCR API(百度/阿里/騰訊)——IO 密集,非常適合 async
• 或者 本地 OCR 服務(例如 PaddleOCR 獨立服務),ARQ 只負責調服務

關鍵點:真正重 CPU/GPU 的推理最好在獨立的推理服務裏跑,ARQ 更適合作為“編排 + IO 請求調度”。

二、基於 ARQ 的整體架構設計

  1. 組件劃分
  1. FastAPI 服務(api-service)
    • 提供 HTTP API:
    • POST /ocr/tasks:上傳文件 / 提交任務,返回 task_id
    • GET /ocr/tasks/{task_id}:查詢任務狀態+進度+結果摘要
    • 負責:
    • 文件接收 & 存儲(寫到對象存儲/本地磁盤)
    • 創建 DB 記錄(任務 & 文檔 & 頁)
    • 把任務扔進 ARQ 隊列(只傳 ID,不傳大文件)
  2. Redis
    • ARQ 的隊列 + 任務結果存儲
    • 只存少量任務參數 / 狀態,不存大文本(避免 Redis 爆)
  3. ARQ Worker(ocr-worker)
    • 使用 arq worker.WorkerSettings 啓動
    • 核心任務:
    • ocr_document(doc_id, retry_count=0)
    • 內部:拆頁 → 併發調用 OCR → 存 DB → 更新進度 → 合併結果
    • 任務函數全部使用 async def,適配雲 OCR / HTTP 調用場景
  4. Postgres
    • 存任務狀態 & 結果:
    • ocr_task 表:任務級別(PDF/圖片)
    • ocr_page 表:按頁存儲識別結果
    • 提供數據持久化,保證重啓後不會丟結果
  5. 對象存儲 / 本地文件系統
    • 存原始 PDF/圖片 + 拆頁後的中間圖片(如果有)

  1. 任務處理流程(以 PDF 為例)
  1. 提交任務(FastAPI)
    • 用户上傳 PDF
    • API 做的事情:
  2. 保存文件到存儲,得到 file_path 或 file_key
  3. 在 ocr_task 表插一條記錄:
    • task_id
    • file_path
    • status = PENDING
    • progress = 0
  4. 通過 ARQ 入隊:
job = await redis_pool.enqueue_job(
    "ocr_document",
    task_id,
    retry_count=0,
)
4.	返回 task_id 給前端

2.	Worker 側:ocr_document 任務邏輯
async def ocr_document(ctx, task_id: str, retry_count: int = 0):
    db = ctx["db"]  # 啓動時注入
    try:
        # 1. 更新任務狀態為 RUNNING
        await db.update_task_status(task_id, "RUNNING")

        # 2. 根據 task_id 查出 file_path,判斷是 PDF 還是圖片
        task = await db.get_task(task_id)
        file_path = task.file_path

        if task.file_type == "pdf":
            # 2.1 拆 PDF 為多頁圖片
            page_paths = await split_pdf_to_images(file_path)
        else:
            page_paths = [file_path]

        total = len(page_paths)
        results = []
# 3. 控制併發調用 OCR(雲 OCR / 本地 OCR 服務)

sem = asyncio.Semaphore(5) # 限制同時請求數
async def ocr_one(i, page_path):
async with sem:
text, extra = await call_ocr_api(page_path)
await db.save_page_result(task_id, i, text, extra)
# 更新進度
await db.update_task_progress(task_id, int((i+1) / total * 100))

await asyncio.gather(*[
    ocr_one(i, p) for i, p in enumerate(page_paths)
])

    # 4. 合併結果/做後處理(可選)

await db.mark_task_success(task_id)

except TemporaryError as e:
# 自定義的“暫時性錯誤”,比如網絡/雲服務 5xx
MAX_RETRY = 3
if retry_count < MAX_RETRY:
# 10 秒後重試,並帶上 retry_count+1
from arq import Retry
raise Retry(defer=10, kwargs={
“task_id”: task_id,
“retry_count”: retry_count + 1
})
else:
await db.mark_task_failed(task_id, reason=str(e))
raise

except Exception as e:
# 其他不可恢復錯誤
await db.mark_task_failed(task_id, reason=str(e))
raise

3.	查詢任務結果
•	GET /ocr/tasks/{task_id} 從 Postgres 讀:
•	status
•	progress
•	如果成功:可以返回文本摘要 / 頁數 / 下載鏈接

  1. 宕機 & 重啓時的恢復策略

1)Redis 隊列裏的任務
• 未開始執行的任務都在 Redis 裏
• 只要 Redis 沒掛(開啓 AOF 或持久化),重啓 worker 後會繼續執行

2)執行中的任務(RUNNING)
• 配置 job_timeout,比如 10 分鐘:

class WorkerSettings:
    functions = [ocr_document]
    redis_settings = RedisSettings(...)
    job_timeout = 600
•	如果 worker 崩掉 / kill -9:
•	Redis 認為這個 job 處於執行中,但 job_timeout 到期後會判定為失敗
•	我們的補償策略:
•	在 ocr_task 中維護 last_update_time(每處理一頁更新一次)
•	啓一個“巡檢任務”(可以是另一個定時腳本 / 服務):
•	定期掃描 status=RUNNING 且 last_update_time 超過 N 分鐘的任務
•	判斷為“疑似殭屍任務”
•	再次通過 ARQ enqueue_job("ocr_document", task_id, retry_count=當前+1)

這樣就實現了:
• 服務優雅關閉:worker 會把手上的任務跑完再退出
• 服務異常宕機:通過 job_timeout + last_update_time 把“半途掛掉”的任務重新入隊

三、使用 ARQ 做這類業務的優點

  1. 和 FastAPI 風格統一:全鏈路 async
    • FastAPI 本身是 async 框架
    • ARQ 的任務函數也是 async def,調用雲 OCR、對象存儲、DB 都是 await
    • 整個項目是純 async 風格,思維模型一致,協程調度簡單清晰
  2. 對雲 OCR / HTTP IO 場景特別友好
    • OCR 如果是走雲廠商 API,本地主要是網絡 IO + 等待時間
    • 使用 ARQ + asyncio.gather 可以輕鬆做到:
    • 一個 worker 同時跑多個 OCR 請求
    • 控制併發(Semaphore)避免打爆雲服務 QPS
    • CPU 不重的情況下:這種 async 併發非常高效
  3. 架構簡單、組件少
    • 只需要 Redis(既做隊列又存 job 狀態)
    • 對比 Celery:
    • 無需 RabbitMQ / 額外 backend
    • Worker 配置簡單,一個 WorkerSettings 就夠

對於你這種自己掌控部署、還要搞一堆微服務的人來説,少一個組件就少一堆運維心智負擔。

  1. 重試機制可按業務精細控制
    • 用 Retry(defer=秒數, kwargs=…) 明確告訴 ARQ“過多久再重試”
    • 很適合 OCR 裏這種“雲接口暫時 500/超時,再試幾次”的場景
    • 你可以在任務中設計:
    • 最大重試次數
    • 重試間隔(固定/遞增)
    • 哪些異常重試,哪些異常直接失敗
    • 完全業務驅動,不被框架的黑魔法限制
  2. 適合“調度+編排”,而不是“重推理”
    • 你本來就打算把 PaddleOCR / 大模型等重推理部分單獨做服務:
    • ARQ 負責:排隊 → 調 OCR 服務 → 存結果 → 更新進度
    • OCR 服務只負責推理
    • 在這個定位下,ARQ 非常合適當“業務編排層”的隊列框架

四、使用 ARQ 的不足 / 風險點

  1. 僅支持 Redis,擴展性受限
    • ARQ 目前只支持 Redis 作為隊列和結果存儲
    • 如果你將來希望:
    • 使用 RabbitMQ / Kafka / SQS 等更“重量級”的消息系統
    • 或者需要更強的持久化語義 / 消息重放
    • 那 ARQ 就不適合,需要換框架(例如 Celery 或自己對接 Kafka)

對你目前來説,Redis 足夠,但這是個中長期的約束。

  1. 沒有內置類似 Celery beat 的定時調度器
    • ARQ 沒有像 Celery beat 那樣的“任務調度器”
    • 如果你要:
    • 定期掃描殭屍任務
    • 定時批量做 OCR 任務
    • 需要:
    • 用 crontab / APScheduler / 一個小的 FastAPI 定時服務來自行實現

不是不能做,就是需要你自己寫一點調度邏輯。

  1. 重試策略需要自己封裝“標準化”
    • ARQ 只提供一個 Retry 異常
    • “最大重試次數、退避策略、統一日誌記錄”都需要你自己封裝一個小工具層
    • 對你來講不難,但團隊協作時要保證所有任務遵循同一套規範

Celery 這塊有比較完整的官方支持(max_retries, countdown, retry_backoff 等)。

  1. 可視化監控和生態偏弱
    • Celery 有 Flower,還有無數經驗博客
    • ARQ 的生態比較“極客”,可視化監控需要你自己接:
    • Prometheus + Grafana
    • 自寫管理接口(比如列出任務狀態、處理速度等)
    • 對你這種本來就要搭日誌/監控體系的人來説問題不大,但不如 Celery 開箱即用。
  2. 對 CPU/GPU 密集任務不是最優形態
    • ARQ 是 async 單進程事件循環模型,要充分利用多核/多 GPU,需要:
    • 啓動多個 worker 進程 / 容器
    • 或把重 CPU 邏輯放到其他服務(推薦)
    • Celery 的多進程 worker 模型在直接跑本地推理時更自然一些

對你的場景:推薦把重推理獨立服務化,ARQ 做調度,這個缺點就不算大問題。

  1. Redis 任務持久化要自己注意配置
    • 如果 Redis 配置不好(比如純內存、沒有 AOF/RDB),崩潰時隊列裏的任務會丟
    • ARQ 自己不管這些,需要你在 Redis 層:
    • 開啓 RDB/AOF
    • 做主從/哨兵(高可用)

不過這點不管 Celery/ARQ 都一樣:broker 崩了都得你自己兜底。

五、結合你當前業務的建議結論

如果我們只看你現在這條線:
• PDF / 圖片 OCR
• 很多調用雲 OCR、未來還要調智能編目、質檢等服務
• 有 FastAPI、Redis、Postgres 的基礎
• 你能接受自己封裝一層“任務重試 + 狀態管理 + 監控”

那麼:

✅ 用 ARQ 做“異步任務 & 編排層”是可行且好用的選擇,尤其是對於 IO 型任務(雲 OCR)很合適。
⚠️ 但前提是:
• 真正重推理(PaddleOCR / 大模型)放到獨立推理服務
• ARQ + Redis 只存 ID & 狀態,結果進 DB
• 你願意自己寫一點:重試封裝、殭屍任務恢復、監控接口。

如果你後面打算把這一套做成“全公司統一任務中台”,還要承載各種類型的任務(視頻轉碼、大模型推理等等),那可以:
• 當前 OCR 項目用 ARQ(輕便、開發快)
• 並並行規劃一套 更通用的 Celery 任務平台 作為長遠演進方向(甚至可以共存一段時間)