H2HI 朋友們好久不見,近期又要開始更新了,先帶來一篇如何使用thinkphp8如何使用think-queue包的教程,來實現Redis的隊列使用
第一步:安裝topthink/think-queue包--composer
composer require topthink/think-queue
注意,確保您的php已經安裝了redis擴展並且redis服務也啓動
第二步:配置隊列驅動
在config目錄下,修改queue.php配置文件(一般安裝完成會自動生成,如果沒有就手動創建一個)。示例配置如下:
<?php
return [
'default' => 'redis', // 默認隊列驅動
'connections' => [
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '', // 如果有密碼則填寫
'select' => 0, // 數據庫索引,默認0
'timeout' => 0,
'persistent' => false, // 是否長連接
'expire' => 60, // 任務執行超時時間,單位秒
'retry_after' => 60, // 任務重試時間,單位秒
],
],
'failed' => [
'type' => 'None', // 失敗任務存儲類型,None為不存儲,可配置為database、redis等
'table' => 'failed_jobs', // 如果類型為database,則需指定表名
],
];
第三步:創建任務類 任務類可以放在app目錄下的job目錄中,每個任務類都需要實現think\queue\Job接口,或者更簡單的使用think\queue\ShouldQueue接口。並實現fire方法
例如:我們創建一個處理髮送郵件的任務類,app\job\SendEmail.php
<?php
namespace app\Job;
use think\queue\Job;
class SendEmail
{
/**
* 使用 fire 方法 - 這是 ThinkPHP 隊列的標準方法
*/
public function fire(Job $job, $data)
{
echo "開始處理郵件任務,收件人:" . ($data['to'] ?? '未知') . "\n";
try {
// 處理郵件發送邏輯
$result = $this->sendEmail($data);
if ($result) {
// 任務成功,刪除任務
$job->delete();
echo "發送郵件成功,收件人:" . ($data['to'] ?? '未知') . "\n";
return true;
} else {
// 任務失敗,根據重試次數決定
if ($job->attempts() > 3) {
$job->delete();
echo "郵件發送失敗,已達到最大重試次數\n";
} else {
$job->release(10); // 10秒後重試
echo "郵件發送失敗,10秒後重試\n";
}
return false;
}
} catch (\Exception $e) {
// 捕獲異常
echo "郵件發送異常: " . $e->getMessage() . "\n";
if ($job->attempts() > 3) {
$job->delete();
} else {
$job->release(10);
}
return false;
}
}
/**
* 發送郵件邏輯
*/
private function sendEmail($data)
{
// 模擬郵件發送
echo "正在發送郵件到: " . ($data['to'] ?? '未知') . "\n";
sleep(5); // 模擬發送時間
// 模擬發送結果(90%成功率)
return rand(1, 10) > 0;
}
第四步:推送任務到隊列
在需要推送任務的地方,一般是在控制器中,使用Queue的facade來推送任務
<?php
namespace app\controller;
use app\BaseController;
use think\facade\Queue;
class Index extends BaseController
{
public function index()
{
$emailData = [
'to' => 'user@example.com',
'subject' => '測試郵件',
'content' => '這是一封測試郵件',
'from' => 'noreply@example.com'
];
// 投遞任務到默認隊列
$job = 'app\job\SendEmail';
$queue = 'default';
$jobId = Queue::push($job, $emailData, $queue);
if ($jobId) {
return json([
'code' => 200,
'message' => '郵件任務投遞成功',
'job_id' => $jobId,
'queue' => $queue
]);
} else {
return json([
'code' => 500,
'message' => '任務投遞失敗'
]);
}
}
/**
* 檢查隊列狀態
*/
public function queueStatus()
{
try {
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
$queues = ['default'];
$status = [];
foreach ($queues as $queue) {
$waiting = $redis->lLen("queues:{$queue}");
$delayed = $redis->zCard("queues:{$queue}:delayed");
$reserved = $redis->zCard("queues:{$queue}:reserved");
$failed = $redis->lLen("queues:{$queue}:failed");
$status[$queue] = [
'waiting' => $waiting,
'delayed' => $delayed,
'reserved' => $reserved,
'failed' => $failed
];
}
return json([
'code' => 200,
'data' => $status
]);
} catch (\Exception $e) {
return json([
'code' => 500,
'message' => 'Redis連接失敗: ' . $e->getMessage()
]);
}
}
}
完成以上內容,訪問對應控制器即可像隊列發佈隊列任務,但是這個時候還沒有監聽,最後我們要啓動隊列監聽器。
第五步:創建command自定義指令
thinkphp支持自定義指令,我們只需要在app目錄下創建command文件夾,新建文件Work.php 內容如下:
<?php
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Queue;
class Work extends Command
{
protected function configure()
{
$this -> setName('queue:work')
->setDescription("Process the next job on a queue");
}
protected function execute(Input $input, Output $output)
{
Queue::worker('redis');
}
}
最後啓動隊列監聽器
php think queue:work --queue default #--queue default可以不輸入,直接啓動,現在是指定啓動對應的default隊列監聽,default可以在發佈任務的時候進行自定義
#也可以使用守護進程模式啓動,讓隊列監聽器一直在後台運行
php think queue:work --daemon --queue default
最終效果: