H2HI 朋友們好久不見,近期又要開始更新了,先帶來一篇如何使用thinkphp8如何使用think-queue包的教程,來實現Redis的隊列使用

第一步:安裝topthink/think-queue包--composer

composer require topthink/think-queue

注意,確保您的php已經安裝了redis擴展並且redis服務也啓動

第二步:配置隊列驅動

在config目錄下,修改queue.php配置文件(一般安裝完成會自動生成,如果沒有就手動創建一個)。示例配置如下:

<?php
return [
    'default' => 'redis', // 默認隊列驅動
    'connections' => [
        'redis' => [
            'type' => 'redis',
            'queue' => 'default',
            'host' => '127.0.0.1',
            'port' => 6379,
            'password' => '', // 如果有密碼則填寫
            'select' => 0, // 數據庫索引,默認0
            'timeout' => 0,
            'persistent' => false, // 是否長連接
            'expire' => 60, // 任務執行超時時間,單位秒
            'retry_after' => 60, // 任務重試時間,單位秒
        ],
    ],
    'failed' => [
        'type' => 'None', // 失敗任務存儲類型,None為不存儲,可配置為database、redis等
        'table' => 'failed_jobs', // 如果類型為database,則需指定表名
    ],
];

第三步:創建任務類 任務類可以放在app目錄下的job目錄中,每個任務類都需要實現think\queue\Job接口,或者更簡單的使用think\queue\ShouldQueue接口。並實現fire方法

例如:我們創建一個處理髮送郵件的任務類,app\job\SendEmail.php

<?php

namespace app\Job;
use think\queue\Job;

class SendEmail
{
    /**
     * 使用 fire 方法 - 這是 ThinkPHP 隊列的標準方法
     */
    public function fire(Job $job, $data)
    {
        echo "開始處理郵件任務,收件人:" . ($data['to'] ?? '未知') . "\n";

        try {
            // 處理郵件發送邏輯
            $result = $this->sendEmail($data);

            if ($result) {
                // 任務成功,刪除任務
                $job->delete();
                echo "發送郵件成功,收件人:" . ($data['to'] ?? '未知') . "\n";
                return true;
            } else {
                // 任務失敗,根據重試次數決定
                if ($job->attempts() > 3) {
                    $job->delete();
                    echo "郵件發送失敗,已達到最大重試次數\n";
                } else {
                    $job->release(10); // 10秒後重試
                    echo "郵件發送失敗,10秒後重試\n";
                }
                return false;
            }
        } catch (\Exception $e) {
            // 捕獲異常
            echo "郵件發送異常: " . $e->getMessage() . "\n";

            if ($job->attempts() > 3) {
                $job->delete();
            } else {
                $job->release(10);
            }
            return false;
        }
    }

    /**
     * 發送郵件邏輯
     */
    private function sendEmail($data)
    {
        // 模擬郵件發送
        echo "正在發送郵件到: " . ($data['to'] ?? '未知') . "\n";
        sleep(5); // 模擬發送時間

        // 模擬發送結果(90%成功率)
        return rand(1, 10) > 0;
    }

第四步:推送任務到隊列

在需要推送任務的地方,一般是在控制器中,使用Queue的facade來推送任務

<?php

namespace app\controller;

use app\BaseController;
use think\facade\Queue;

class Index extends BaseController
{
    public function index()
    {
        $emailData = [
            'to' => 'user@example.com',
            'subject' => '測試郵件',
            'content' => '這是一封測試郵件',
            'from' => 'noreply@example.com'
        ];

        // 投遞任務到默認隊列
        $job = 'app\job\SendEmail';
        $queue = 'default';

        $jobId = Queue::push($job, $emailData, $queue);

        if ($jobId) {
            return json([
                'code' => 200,
                'message' => '郵件任務投遞成功',
                'job_id' => $jobId,
                'queue' => $queue
            ]);
        } else {
            return json([
                'code' => 500,
                'message' => '任務投遞失敗'
            ]);
        }
    }

    /**
     * 檢查隊列狀態
     */
    public function queueStatus()
    {
        try {
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);

            $queues = ['default'];
            $status = [];

            foreach ($queues as $queue) {
                $waiting = $redis->lLen("queues:{$queue}");
                $delayed = $redis->zCard("queues:{$queue}:delayed");
                $reserved = $redis->zCard("queues:{$queue}:reserved");
                $failed = $redis->lLen("queues:{$queue}:failed");

                $status[$queue] = [
                    'waiting' => $waiting,
                    'delayed' => $delayed,
                    'reserved' => $reserved,
                    'failed' => $failed
                ];
            }

            return json([
                'code' => 200,
                'data' => $status
            ]);
        } catch (\Exception $e) {
            return json([
                'code' => 500,
                'message' => 'Redis連接失敗: ' . $e->getMessage()
            ]);
        }
    }
}

完成以上內容,訪問對應控制器即可像隊列發佈隊列任務,但是這個時候還沒有監聽,最後我們要啓動隊列監聽器。

第五步:創建command自定義指令

thinkphp支持自定義指令,我們只需要在app目錄下創建command文件夾,新建文件Work.php 內容如下:

<?php

namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Queue;

class Work extends Command
{
    protected function configure()
    {
        $this -> setName('queue:work')
            ->setDescription("Process the next job on a queue");
    }

    protected function execute(Input $input, Output $output)
    {
        Queue::worker('redis');
    }
}

最後啓動隊列監聽器

php think queue:work --queue default #--queue default可以不輸入,直接啓動,現在是指定啓動對應的default隊列監聽,default可以在發佈任務的時候進行自定義 
#也可以使用守護進程模式啓動,讓隊列監聽器一直在後台運行
php think queue:work --daemon --queue default

最終效果: image.png