动态

详情 返回 返回

基於PHP進程控制與Redis隊列的異步任務實踐——解決Excel導入接口超時問題 - 动态 详情

問題背景與解決方案

問題場景

在實現Excel數據導入功能時,遇到一個典型的生產者-消費者場景:

  1. 主流程:Excel文件解析 → 數據校驗 → 數據庫事務寫入
  2. 附加流程:將成功數據推送給第三方系統

當第三方接口響應緩慢時(實測平均耗時8-12秒),導致整體接口響應時間超出前端等待閾值,造成以下問題:

  • 前端顯示系統錯誤(HTTP 500)
  • 實際業務數據已完整入庫
  • 用户體驗與數據一致性存在割裂

解決方案演進

  1. 同步方案:直接順序執行(已存在問題)
  2. 隊列方案:Redis隊列 + 獨立消費者進程(最優解但需額外部署)
  3. 折中方案:PHP進程控制 + Redis臨時隊列(本文實現方案)

最終採用方案3,在保證系統輕量化的前提下實現異步處理,技術組合:

  • Redis List結構作為臨時存儲
  • PHP pcntl擴展進行進程控制
  • ThinkPHP6命令行組件實現消費邏輯

技術實現詳解

1. Redis隊列封裝(生產者端)

class RedisQueue extends RedisBase
{
    /**
     * 安全寫入隊列(支持複雜數據結構)
     * @param string $name 隊列名稱
     * @param mixed $value 支持字符串/數組/對象
     * @return int 隊列長度
     */
    public function rPush(string $name, $value)
    {
        $value = is_scalar($value) ? $value : json_encode($value);
        return $this->handle->rPush($this->getCacheKey($name), $value);
    }

    /**
     * 安全讀取隊列(自動反序列化)
     * @param string $name 隊列名稱
     * @return mixed 原始數據類型
     */
    public function lPop(string $name)
    {
        $value = $this->handle->lPop($this->getCacheKey($name));
        return json_decode($value, true) ?? $value;
    }
}

設計要點

  • 自動序列化/反序列化處理
  • 兼容標量值與複雜數據結構
  • 繼承RedisBase實現連接池管理

2. 異步觸發機制

private function send_import_sku_to_yjt($data)
{
    $redis = new RedisQueue();
    $queueName = 'yjt_sku_import';
    
    // 數據分批入隊(避免大消息體)
    foreach (array_chunk($data, 100) as $batch) {
        $redis->rPush($queueName, $batch);
    }

    // 構建異步命令
    $rootPath = root_path();
    $command = sprintf(
        'php74 %sthink jiayi sendSkuToYjt -r %s -p %s &> /dev/null &',
        $rootPath,
        escapeshellarg($queueName),
        escapeshellarg(json_encode(['operator' => $this->getCurrentUser()]))
    );

    // 非阻塞執行
    pclose(popen($command, 'r')); 
}

關鍵技術點

  • popen():創建並行進程,非阻塞執行
  • escapeshellarg():防止命令注入攻擊
  • 後台運行符&:脱離當前進程控制
  • 輸出重定向:&> /dev/null丟棄日誌

3. 命令行消費者(守護進程)

class Jiayi extends Command
{
    public function sendSkuToYjt(Input $input)
    {
        $queueName = $input->getOption('redis_name');
        $context = json_decode($input->getOption('params'), true);

        $redis = new RedisQueue();
        $retryCount = 0;

        while ($batch = $redis->lPop($queueName)) {
            try {
                $this->processBatch($batch, $context);
                $retryCount = 0; // 重置重試計數器
            } catch (Exception $e) {
                if ($retryCount++ < 3) {
                    $redis->rPush($queueName, $batch); // 重新入隊
                    sleep(pow(2, $retryCount)); // 指數退避
                } else {
                    $this->logError($e, $batch);
                }
            }
        }
    }

    private function processBatch($batch, $context)
    {
        $yjtData = YJTSku::convertYJTSkuData($batch);
        $yjtData['yjt_token'] = YJTUtil::getToken($batch[0]['company_id']);
        
        AbsYJT::sendYJT(
            YJTUrlKey::ADD_SKU,
            $yjtData,
            $context['operator']
        );
    }
}

消費者特性

  • 失敗重試機制(3次指數退避)
  • 上下文傳遞(操作人信息)
  • 異常處理與日誌記錄
  • 批量處理支持

系統架構原理

image.png

方案優勢分析

  1. 響應速度優化

    • 主流程耗時從12s+降至200ms內
    • 前端立即獲得成功反饋
  2. 系統可靠性

    • Redis持久化保證數據不丟失
    • 重試機制應對第三方系統不穩定
    • 進程隔離避免主流程崩潰
  3. 資源利用率

    • 按需創建消費者進程
    • 無常駐進程佔用資源
    • 可平滑過渡到專業隊列系統
  4. 可觀測性

    • Redis隊列長度監控
    • 失敗記錄與告警機制
    • 操作日誌審計追蹤

生產環境建議

  1. 安全增強

    // 增加隊列名前綴隔離
    private function getCacheKey($name)
    {
        return config('app.env').':queue:'.$name;
    }
    
    // 命令執行增加權限校驗
    if (!in_array(get_current_user(), ['www-data', 'nginx'])) {
        exit('Permission denied');
    }
  2. 性能調優

    • 調整PHP-FPM的max_children配置
    • 設置Redis內存淘汰策略為volatile-lru
    • 監控隊列堆積告警(通過Redis的LLEN命令)
  3. 高可用方案

    • 部署多個消費者實例
    • 使用Supervisor進程管理
    • 設置Redis哨兵模式

本方案在保證系統輕量化的前提下,有效解決了同步接口的超時問題。後續若業務量增長,可通過以下步驟平滑升級:

  1. 引入RabbitMQ/Kafka專業消息隊列
  2. 部署獨立的消費者集羣
  3. 增加流量控制與熔斷機制

該模式適用於中小型系統的異步任務處理,特別是臨時性、低頻次的業務場景,能有效平衡開發成本與系統性能。

Add a new 评论

Some HTML is okay.