Stories

Detail Return Return

think-queue隊列擴展實戰 - Stories Detail

topthink/think-queue 是 ThinkPHP 框架的官方隊列擴展,用於處理異步任務(如訂單通知、郵件發送、數據同步等),避免因耗時操作阻塞主流程。以下是其常見用法和核心功能説明:

一. 安裝與配置

  1. 安裝擴展

    composer require topthink/think-queue
  2. 配置隊列驅動
    在 config/queue.php 中配置隊列驅動(支持 sync 同步、redis、database 等):

    return [
     'default'     => 'redis', // 默認隊列驅動
     'connections' => [
         'sync' => [
             'type' => 'sync', // 同步執行(用於調試)
         ],
         'redis' => [
             'type'       => 'redis',
             'host'       => '127.0.0.1',
             'port'       => 6379,
             'password'   => '',
             'select'     => 12, // 數據庫編號
             'timeout'    => 0, // redis連接的超時時間
             'persistent' => false,
             //'prefix'     => 'think_queue:', // 隊列鍵前綴
         ],       
     ],
    ];

二. 創建任務類

任務類需藉助 think\queue\Job,並實現 fire 方法(任務執行邏輯)和 failed 方法(任務失敗處理)。
示例: app\device\jobs\DeviceJob(設備訂單執行任務)

namespace app\device\jobs;

use think\Log;
use think\queue\Job;

class DeviceJob
{
    /**
     * 任務執行邏輯
     * @param Job $job 任務對象
     * @param array $data 傳遞的參數(如訂單ID、手機號等)
     */
    public function fire(Job $job, array $data)
    {

        try {
            // 實際業務邏輯處理
            $result = $this->doBusiness($data);
            if ($result) {
                // 任務執行成功,刪除任務
                $job->delete();
                echo "任務執行成功\n";
            } else {
                // 執行失敗,判斷是否需要重試
                if ($job->attempts() < 3) { // 最多重試3次
                    $job->release(5); // 5秒後重試
                    echo "任務執行失敗,將在5秒後重試\n";
                } else {
                    // 超過重試次數,標記失敗
                    $job->delete();
                    $this->failed($data); // 調用失敗處理
                    echo "任務多次失敗,已放棄\n";
                }
            }
        } catch (\Exception $e) {
            // 捕獲異常,按失敗處理
            $job->release(10); // 10秒後重試
            echo "執行異常:{$e->getMessage()}\n";
        }
    }

    //執行業務邏輯
    public function doBusiness($data)
    {
        // 業務邏輯處理
    
        return true;
    }


    /**
     * 任務失敗處理(如記錄日誌、人工干預)
     * @param array $data 任務參數
     */
    public function failed($data)
    {
        // 記錄失敗日誌(可寫入數據庫或文件)
        Log::error("任務失敗:" . json_encode($data));
    }
}

三. 推送任務到隊列

在控制器或服務中推送任務到隊列。

public function test(Request $request)
{
    //在測試方法中推送任務到隊列
    \queue(app\device\jobs\DeviceJob::class, ['device_sn' => 'A8:4F:A4:CA:B9:AB']);
}

上面的test()方法執行完成後在redis隊列中就會多一條記錄:
image.png

四. 啓動隊列worker進程

任務推送後需啓動 worker 進程消費隊列,通過命令行執行:

  1. 基本命令(消費默認隊列)

    php think queue:work
  2. 消費指定隊列(推薦)

    php think queue:work --queue order_notify,order_check
    # 同時消費 order_notify 和 order_check 隊列,優先級按順序排列
  3. 守護進程模式(生產環境)

    php think queue:work --daemon --queue order_notify
    # --daemon:以守護進程運行(進程常駐,自動重啓)
    # 退出需用 kill 命令終止進程
user avatar wobushiliaojian Avatar danieldx Avatar euphoria Avatar
Favorites 3 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.