一、可靠性分析
從架構圖上,我們可以看出worker調用大模型服務過程中,會發生阻塞等待,如果此時worker異常容器掛掉了,那麼此次任務狀態會一直為processing,並且因為redis關聯task_id的消息已經被消費了,那麼這個任務就無法被識別出來重試。
基於這個場景分析,我們要補充巡檢服務,去定時重啓處於超時並且狀態為processing的任務,此時服務可以從mysql撈任務表,但考慮到性能等影響,我們選擇在redis構建新的processing隊列,存儲正在執行的task_id,構建processing_ts隊列存儲開始處理時間,巡檢服務訪問redis的processing隊列、processing_ts隊列來更新狀態異常的任務。
適配worker服務邏輯:設置原子操作保證worker取任務+放入processing不會被中斷。

二、邏輯實現
1. doc_llm_test_worker補充原子操作將task從ready移動到processing,記錄開始執行的時間
TASK_QUEUE_READY_KEY = "docllm:queue:ready" TASK_QUEUE_PROCESSING_KEY = "docllm:queue:processing" TASK_PROCESSING_TS_KEY = "docllm:hash:processing_ts" def worker_loop(): """文檔檢查任務 worker 主循環""" logging.info("doc_llm_test_worker started, waiting for tasks...") while True: try: raw_item = redis_client.brpoplpush(TASK_QUEUE_READY_KEY, TASK_QUEUE_PROCESSING_KEY, timeout=10) if not raw_item: time.sleep(5) continue # 沒有任務,就繼續下一輪 try: payload_str = raw_item.decode("utf-8") data = json.loads(payload_str) task_id = int(data["task_id"]) except Exception as e: logging.exception(f"invalid processing queue item: {raw_item!r}") redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item) continue start_ts = int(time.time()) redis_client.hset(TASK_PROCESSING_TS_KEY, task_id, start_ts) try: process_task(task_id) finally: redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item) redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id) except Exception: logging.exception("unexpected error in worker loop, sleep 3s") time.sleep(3)
2.補充巡檢服務,定時重啓處於超時並且狀態為processing的任務,需要做到重新入隊 + 狀態恢復流程
設置參數 PROCESSING_TIMEOUT_SECONDS = 600
判斷邏輯:
now_ts - start_ts > PROCESSING_TIMEOUT_SECONDS
該任務視為:
-
worker 處理失敗(worker 崩了/卡死)
-
需要重新 pending
-
丟回 ready 隊列給新的 worker
適配task_service,提供給巡檢服務同步改數據庫任務狀態
def mark_task_processing(task_id: int) -> bool: """worker 剛拿到任務時調用:pending -> processing""" with get_session() as session: stmt = ( update(TaskDocLLM).where( TaskDocLLM.task_id == task_id, TaskDocLLM.status == TaskStatus.pending ).values( status=TaskStatus.processing, processing_started_at=func.now() ) ) result = session.execute(stmt) session.commit() return result.rowcount == 1 def reclaim_task(task_id: int, timeout_dt) -> bool: """ 將超時的任務重新放回隊列 :param timeout_dt: datetime對象,代表“必須早於此時間才會被恢復” """ with get_session() as session: stmt = ( update(TaskDocLLM).where( TaskDocLLM.task_id == task_id, TaskDocLLM.status == TaskStatus.processing, TaskDocLLM.processing_started_at < timeout_dt ).values( status=TaskStatus.pending, retry_count=TaskDocLLM.retry_count + 1, processing_started_at=None, result=None ) ) result = session.execute(stmt) session.commit() return result.rowcount == 1
新增巡檢函數reaper_loop,篩選超時任務,恢復狀態:
def reaper_loop(): """巡檢 processing 隊列,恢復超時的任務""" logging.info("doc_llm_reaper started, interval=%ss, timeout=%ss", REAPER_INTERVAL_SECONDS, PROCESSING_TIMEOUT_SECONDS) while True: try: now_ts = int(time.time()) timeout_border_ts = now_ts - PROCESSING_TIMEOUT_SECONDS timeout_threshold_dt = datetime.utcnow() - timedelta(seconds=PROCESSING_TIMEOUT_SECONDS) items = redis_client.lrange(TASK_QUEUE_PROCESSING_KEY, 0, -1) if not items: time.sleep(REAPER_INTERVAL_SECONDS) continue for raw in items: try: payload_str = raw.decode("utf-8") payload = json.loads(payload_str) task_id = payload.get("task_id") task_name = payload.get("task_name") except Exception: redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw) continue start_ts_raw = redis_client.hget(TASK_PROCESSING_TS_KEY, task_id) if start_ts_raw is None: continue start_ts = int(start_ts_raw) if start_ts > timeout_border_ts: continue logging.warning(f"doc_llm_reaper: task {task_id} seems stuck, start_ts={start_ts}, now_ts={now_ts}") ok = task_service.reclaim_task(task_id, timeout_threshold_dt) if not ok: continue redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw) redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id) new_payload = json.dumps( {"task_id": task_id, "task_name": task_name}, ensure_ascii=False ) redis_client.lpush(TASK_QUEUE_READY_KEY, new_payload) logging.info(f"doc_llm_reaper: task {task_id} reclaimed and requeued to READY") except Exception: logging.exception("unexpected error in reaper loop, sleep 3s") time.sleep(REAPER_INTERVAL_SECONDS)
在主進程之外,起一個線程循環跑巡檢:
def start_reaper_thread(): reaper_thread = threading.Thread(target=reaper_loop, name="doc_llm_reaper", daemon=True) reaper_thread.start() return reaper_thread if __name__ == "__main__": setup_logging() init_llm() start_reaper_thread() worker_loop()