交換器
RabbitMQ 消息傳遞模型的核心思想是生產者從不直接向隊列發送任何消息。生產者只將消息發送到 Exchange 交換器中,並不知道消息是否會被傳送到隊列。交換器負責接收生產者生產的消息,並通過一定路由規則將消息發送到指定的隊列,起到一個傳遞的作用
類型介紹
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 這四種(AMQP規範裏還提到兩種 Exchange Type,分別為 system 與自定義,這裏不予以描述)。
fanout
fanout 類型的 Exchange 路由規則非常。它會把所有發送到該 Exchange 的消息路由到所有與它綁定的 Queue 中。這種模式在 RabboitMQ 官方介紹中稱之為:發佈/訂閲
圖 1 fanout Exchange
來看下官方給出的代碼示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'exchange_name';
$channel->exchange_declare($exchange_name, 'fanout', false, false, false);
// 獲取系統返回的隊列名稱
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 將 Queue 和 Exchange 綁定
$channel->queue_bind($queue_name, $exchange_name);
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
執行 shell 命令
php receive_logs.php
php emit_log.php
direct
direct 類型的 Exchange 路由規則也很簡單,它會把消息路由到那些 binding key 與
routing key 完全匹配的 Queue 中。官方説明:direct
圖 2 direct Exchange
以上圖的配置為例,我們以 routingKey=”error”發送消息到 Exchange,則消息會路由到 Queue1(amqp.gen-S9b…,這是由 RabbitMQ 自動生成的 Queue 名稱)和 Queue2(amqp.gen-Agl…);如果我們以 routingKey=”info”或 routingKey=”warning”來發送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。
來看下官方給出的代碼示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
// basic_publish 函數中第三個參數指定 routingKey,將消息、交換器和 routingKey 綁定到一起
$channel->basic_publish($msg, $exchange_name, $severity);
echo ' [x] Sent ', $severity, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'direct_logs';
$channel->exchange_declare($exchange_name, 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
// 循環調用 queue_bind 函數,第三個參數 routingKey,將隊列、交換器和 routingKey 綁定到一起,交換器會根據 routingKey 將消息路由到綁定的隊列中
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, $exchange_name, $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
執行 shell 命令:
# 生產者:
php emit_log.php error "routingKey error"
php emit_log.php warning "routingKey warning"
php emit_log.php info "routingKey info"
php emit_log.php info warning error "routingKey info warning error"
# 消費者:
# 消費者可以執行多個終端
php receive_logs.php info warning error
php receive_logs.php info
php receive_logs.php warning
php receive_logs.php warning error
topic
direct 類型的 Exchange 路由規則是完全匹配 binding key 與 routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic 類型的 Exchange 在匹配規則上進行了擴展,它與 direct 類型的 Exchage 相似,也是將消息路由到 binding key 與 routing key 相匹配的 Queue 中,但這裏的匹配規則有些不同。官方解釋:topic。它約定:
- routing key 為一個英文句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key 與 routing key 一樣也是句點號“. ”分隔的字符串
- binding key 中可以存在兩種特殊字符“”與“#”,用於做模糊匹配,其中“”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個
圖 3 topic Exchange
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到 Q1 與 Q2,routingKey=”lazy.orange.fox”的消息會路由到 Q1,routingKey=”lazy.brown.fox”的消息會路由到 Q2,routingKey=”lazy.pink.rabbit”的消息會路由到 Q2(只會投遞給 Q2 一次,雖然這個 routingKey 與 Q2 的兩個 bindingKey 都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何 bindingKey。
來看下官方給出的代碼示例:
emit_log.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);
echo ' [x] Sent ', $routing_key, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchange_name = 'topic_logs';
$channel->exchange_declare($exchange_name, 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
執行 shell 命令,可自行測試:
# 所有
php receive_logs.php "#"
# 路由到 Q1 與 Q2
php receive_logs.php "quick.orange.rabbit"
# 路由到 Q1
php receive_logs.php "lazy.orange.fox"
# 路由到 Q2
php receive_logs.php "lazy.brown.fox*"
# 路由到 Q1 與 Q2
php emit_log.php "quick.orange.rabbit" "Route to Q1 and Q2 at the same time"
headers
headers 類型的 Exchange 不依賴於 routing key 與 binding key 的匹配規則來路由消
息,而是根據發送的消息內容中的 headers 屬性進行匹配。
在綁定 Queue 與 Exchange 時指定一組鍵值對;當消息發送到 Exchange 時,RabbitMQ 會取
到該消息的 headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配 Queue 與
Exchange 綁定時指定的鍵值對;如果完全匹配則消息會路由到該 Queue,否則不會路由到該
Queue。
headers 類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在。