本章目標
一、理論部分
1. ASP.NET Core集成模式
將RabbitMQ集成到ASP.NET Core應用程序時,我們需要考慮幾個關鍵方面:
2. 生命週期管理
3. 託管服務(Hosted Services)
ASP.NET Core提供了IHostedService接口和BackgroundService基類,用於實現長時間運行的後台任務。這是實現RabbitMQ消費者的理想方式。
4. 微服務架構中的消息模式
-
異步命令:發送指令但不期待立即響應。
-
事件通知:廣播狀態變化。
-
請求-響應:類似RPC,但通過消息中間件。
二、實操部分:構建訂單處理微服務
我們將創建一個完整的訂單處理系統,包含:
第1步:創建項目結構
# 創建解決方案
dotnet new sln -n OrderSystem
# 創建Web API項目
dotnet new webapi -n Order.API
dotnet new classlib -n Order.Core
dotnet new classlib -n Order.Infrastructure
dotnet new classlib -n OrderProcessor.Service
# 添加到解決方案
dotnet sln add Order.API/Order.API.csproj
dotnet sln add Order.Core/Order.Core.csproj
dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj
dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj
# 添加項目引用
dotnet add Order.API reference Order.Core
dotnet add Order.API reference Order.Infrastructure
dotnet add OrderProcessor.Service reference Order.Core
dotnet add OrderProcessor.Service reference Order.Infrastructure
dotnet add Order.Infrastructure reference Order.Core
# 添加NuGet包
cd Order.API
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks
cd ../Order.Infrastructure
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Configuration
cd ../OrderProcessor.Service
dotnet add package RabbitMQ.Client
第2步:定義領域模型(Order.Core)
Models/Order.cs
namespace Order.Core.Models
{
public class Order
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string CustomerId { get; set; }
public string ProductId { get; set; }
public int Quantity { get; set; }
public decimal TotalAmount { get; set; }
public OrderStatus Status { get; set; } = OrderStatus.Pending;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
}
public enum OrderStatus
{
Pending,
Processing,
Completed,
Failed,
Cancelled
}
}
Messages/OrderMessage.cs
namespace Order.Core.Messages
{
public class OrderMessage
{
public string OrderId { get; set; }
public string CustomerId { get; set; }
public string ProductId { get; set; }
public int Quantity { get; set; }
public decimal TotalAmount { get; set; }
public string Action { get; set; } // "create", "cancel"
}
public class OrderStatusMessage
{
public string OrderId { get; set; }
public OrderStatus Status { get; set; }
public string Message { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
}
第3步:基礎設施層(Order.Infrastructure)
Services/IRabbitMQConnection.cs
using RabbitMQ.Client;
namespace Order.Infrastructure.Services
{
public interface IRabbitMQConnection : IDisposable
{
bool IsConnected { get; }
IModel CreateModel();
bool TryConnect();
}
}
Services/RabbitMQConnection.cs
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Order.Infrastructure.Services
{
public class RabbitMQConnection : IRabbitMQConnection
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<RabbitMQConnection> _logger;
private IConnection _connection;
private bool _disposed;
private readonly object _syncRoot = new object();
public RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
}
public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public bool TryConnect()
{
lock (_syncRoot)
{
if (IsConnected) return true;
_logger.LogInformation("RabbitMQ Client is trying to connect");
try
{
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events",
_connectionFactory.HostName);
return true;
}
catch (BrokerUnreachableException ex)
{
_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
return false;
}
catch (SocketException ex)
{
_logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
return false;
}
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is blocked. Reason: {Reason}", e.Reason);
// 這裏可以實現重連邏輯
TryConnect();
}
private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
_logger.LogWarning(e.Exception, "A RabbitMQ connection throw exception. Trying to re-connect...");
// 這裏可以實現重連邏輯
TryConnect();
}
private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
// 這裏可以實現重連邏輯
TryConnect();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection?.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex, "Error disposing RabbitMQ connection");
}
}
}
}
Services/IOrderPublisher.cs
using Order.Core.Messages;
namespace Order.Infrastructure.Services
{
public interface IOrderPublisher
{
Task PublishOrderCreatedAsync(OrderMessage order);
Task PublishOrderStatusAsync(OrderStatusMessage status);
}
}
Services/OrderPublisher.cs
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Order.Core.Messages;
using RabbitMQ.Client;
namespace Order.Infrastructure.Services
{
public class OrderPublisher : IOrderPublisher
{
private readonly IRabbitMQConnection _connection;
private readonly ILogger<OrderPublisher> _logger;
private const string ExchangeName = "order.events";
private const string OrderCreatedRoutingKey = "order.created";
private const string OrderStatusRoutingKey = "order.status";
public OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher> logger)
{
_connection = connection;
_logger = logger;
// 確保交換機和隊列存在
InitializeInfrastructure();
}
private void InitializeInfrastructure()
{
using var channel = _connection.CreateModel();
// 聲明主題交換機
channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true);
// 聲明訂單創建隊列
channel.QueueDeclare("order.created.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.created.queue", ExchangeName, OrderCreatedRoutingKey);
// 聲明訂單狀態隊列
channel.QueueDeclare("order.status.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.status.queue", ExchangeName, OrderStatusRoutingKey);
_logger.LogInformation("RabbitMQ infrastructure initialized");
}
public async Task PublishOrderCreatedAsync(OrderMessage order)
{
await PublishMessageAsync(order, OrderCreatedRoutingKey, "OrderCreated");
}
public async Task PublishOrderStatusAsync(OrderStatusMessage status)
{
await PublishMessageAsync(status, OrderStatusRoutingKey, "OrderStatus");
}
private async Task PublishMessageAsync<T>(T message, string routingKey, string messageType)
{
if (!_connection.IsConnected)
{
_connection.TryConnect();
}
using var channel = _connection.CreateModel();
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.ContentType = "application/json";
properties.Type = messageType;
try
{
channel.BasicPublish(
exchange: ExchangeName,
routingKey: routingKey,
mandatory: true,
basicProperties: properties,
body: body);
_logger.LogInformation("Published {MessageType} message for Order {OrderId}",
messageType, GetOrderId(message));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error publishing {MessageType} message for Order {OrderId}",
messageType, GetOrderId(message));
throw;
}
await Task.CompletedTask;
}
private static string GetOrderId<T>(T message)
{
return message switch
{
OrderMessage order => order.OrderId,
OrderStatusMessage status => status.OrderId,
_ => "unknown"
};
}
}
}
第4步:Order.API項目配置
appsettings.json
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "myuser",
"Password": "mypassword",
"Port": 5672,
"VirtualHost": "/"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
Program.cs
using Order.API.Controllers;
using Order.API.Services;
using Order.Core.Models;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Configure RabbitMQ
builder.Services.AddSingleton<IConnectionFactory>(sp =>
{
var configuration = sp.GetRequiredService<IConfiguration>();
return new ConnectionFactory
{
HostName = configuration["RabbitMQ:HostName"],
UserName = configuration["RabbitMQ:UserName"],
Password = configuration["RabbitMQ:Password"],
Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
DispatchConsumersAsync = true
};
});
// Register RabbitMQ services
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
builder.Services.AddScoped<IOrderService, OrderService>();
// Add Health Checks
builder.Services.AddHealthChecks()
.AddRabbitMQ(provider =>
{
var factory = provider.GetRequiredService<IConnectionFactory>();
return factory.CreateConnection();
}, name: "rabbitmq");
// Add hosted service for status updates consumer
builder.Services.AddHostedService<OrderStatusConsumerService>();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
// Add health check endpoint
app.MapHealthChecks("/health");
app.Run();
Services/IOrderService.cs
using Order.Core.Models;
namespace Order.API.Services
{
public interface IOrderService
{
Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice);
Task<Order?> GetOrderAsync(string orderId);
Task UpdateOrderStatusAsync(string orderId, OrderStatus status);
}
}
Services/OrderService.cs
![]()
using Order.Core.Messages;
using Order.Core.Models;
using Order.Infrastructure.Services;
namespace Order.API.Services
{
public class OrderService : IOrderService
{
private readonly IOrderPublisher _orderPublisher;
private readonly ILogger<OrderService> _logger;
// 內存存儲用於演示(生產環境應該用數據庫)
private static readonly Dictionary<string, Order> _orders = new();
public OrderService(IOrderPublisher orderPublisher, ILogger<OrderService> logger)
{
_orderPublisher = orderPublisher;
_logger = logger;
}
public async Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice)
{
var order = new Order
{
CustomerId = customerId,
ProductId = productId,
Quantity = quantity,
TotalAmount = quantity * unitPrice,
Status = OrderStatus.Pending
};
// 保存到內存
_orders[order.Id] = order;
// 發佈訂單創建事件
var orderMessage = new OrderMessage
{
OrderId = order.Id,
CustomerId = order.CustomerId,
ProductId = order.ProductId,
Quantity = order.Quantity,
TotalAmount = order.TotalAmount,
Action = "create"
};
await _orderPublisher.PublishOrderCreatedAsync(orderMessage);
_logger.LogInformation("Order {OrderId} created and published", order.Id);
return order;
}
public Task<Order?> GetOrderAsync(string orderId)
{
_orders.TryGetValue(orderId, out var order);
return Task.FromResult(order);
}
public async Task UpdateOrderStatusAsync(string orderId, OrderStatus status)
{
if (_orders.TryGetValue(orderId, out var order))
{
order.Status = status;
order.ProcessedAt = DateTime.UtcNow;
// 發佈狀態更新
var statusMessage = new OrderStatusMessage
{
OrderId = orderId,
Status = status,
Message = $"Order {status.ToString().ToLower()}"
};
await _orderPublisher.PublishOrderStatusAsync(statusMessage);
_logger.LogInformation("Order {OrderId} status updated to {Status}", orderId, status);
}
}
}
}
View Code
Services/OrderStatusConsumerService.cs
![]()
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Options;
using Order.API.Services;
using Order.Core.Messages;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Order.API.Services
{
public class OrderStatusConsumerService : BackgroundService
{
private readonly IRabbitMQConnection _connection;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderStatusConsumerService> _logger;
private IModel _channel;
private const string QueueName = "order.status.queue";
public OrderStatusConsumerService(
IRabbitMQConnection connection,
IServiceProvider serviceProvider,
ILogger<OrderStatusConsumerService> logger)
{
_connection = connection;
_serviceProvider = serviceProvider;
_logger = logger;
InitializeChannel();
}
private void InitializeChannel()
{
if (!_connection.IsConnected)
{
_connection.TryConnect();
}
_channel = _connection.CreateModel();
// 確保隊列存在(已經在Publisher中聲明,這裏做雙重保險)
_channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
_channel.BasicQos(0, 1, false); // 公平分發
_logger.LogInformation("OrderStatusConsumerService channel initialized");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
try
{
await ProcessMessageAsync(message);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message: {Message}", message);
_channel.BasicNack(ea.DeliveryTag, false, false); // 不重新入隊
}
};
_channel.BasicConsume(QueueName, false, consumer);
_logger.LogInformation("OrderStatusConsumerService started consuming");
await Task.CompletedTask;
}
private async Task ProcessMessageAsync(string message)
{
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
try
{
var statusMessage = JsonSerializer.Deserialize<OrderStatusMessage>(message);
if (statusMessage != null)
{
// 這裏可以處理狀態更新,比如更新數據庫、發送通知等
_logger.LogInformation("Received order status update: {OrderId} -> {Status}",
statusMessage.OrderId, statusMessage.Status);
// 在實際應用中,這裏可能會更新數據庫中的訂單狀態
// await orderService.UpdateOrderStatusAsync(statusMessage.OrderId, statusMessage.Status);
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Error deserializing message: {Message}", message);
throw;
}
}
public override void Dispose()
{
_channel?.Close();
_channel?.Dispose();
base.Dispose();
}
}
}
View Code
Controllers/OrdersController.cs
![]()
using Microsoft.AspNetCore.Mvc;
using Order.API.Services;
using Order.Core.Models;
namespace Order.API.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IOrderService _orderService;
private readonly ILogger<OrdersController> _logger;
public OrdersController(IOrderService orderService, ILogger<OrdersController> logger)
{
_orderService = orderService;
_logger = logger;
}
[HttpPost]
public async Task<ActionResult<Order>> CreateOrder([FromBody] CreateOrderRequest request)
{
try
{
var order = await _orderService.CreateOrderAsync(
request.CustomerId,
request.ProductId,
request.Quantity,
request.UnitPrice);
return Ok(order);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating order");
return StatusCode(500, "Error creating order");
}
}
[HttpGet("{orderId}")]
public async Task<ActionResult<Order>> GetOrder(string orderId)
{
var order = await _orderService.GetOrderAsync(orderId);
if (order == null)
{
return NotFound();
}
return Ok(order);
}
[HttpGet]
public ActionResult<IEnumerable<Order>> GetOrders()
{
// 這裏只是演示,實際應該從數據庫獲取
return Ok(new List<Order>());
}
}
public class CreateOrderRequest
{
public string CustomerId { get; set; }
public string ProductId { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
}
}
View Code
第5步:訂單處理器服務(OrderProcessor.Service)
Program.cs
![]()
using Order.Core.Messages;
using Order.Infrastructure.Services;
using OrderProcessor.Service.Services;
using RabbitMQ.Client;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<OrderProcessorService>();
// Configure RabbitMQ
builder.Services.AddSingleton<IConnectionFactory>(sp =>
{
var configuration = sp.GetRequiredService<IConfiguration>();
return new ConnectionFactory
{
HostName = configuration["RabbitMQ:HostName"],
UserName = configuration["RabbitMQ:UserName"],
Password = configuration["RabbitMQ:Password"],
Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
DispatchConsumersAsync = true
};
});
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
builder.Services.AddLogging();
var host = builder.Build();
host.Run();
View Code
Services/OrderProcessorService.cs
![]()
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Order.Core.Messages;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace OrderProcessor.Service.Services
{
public class OrderProcessorService : BackgroundService
{
private readonly IRabbitMQConnection _connection;
private readonly IOrderPublisher _orderPublisher;
private readonly ILogger<OrderProcessorService> _logger;
private IModel _channel;
private const string QueueName = "order.created.queue";
public OrderProcessorService(
IRabbitMQConnection connection,
IOrderPublisher orderPublisher,
ILogger<OrderProcessorService> logger)
{
_connection = connection;
_orderPublisher = orderPublisher;
_logger = logger;
InitializeChannel();
}
private void InitializeChannel()
{
if (!_connection.IsConnected)
{
_connection.TryConnect();
}
_channel = _connection.CreateModel();
_channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
_channel.BasicQos(0, 1, false);
_logger.LogInformation("OrderProcessorService channel initialized");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
try
{
await ProcessOrderAsync(message);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing order: {Message}", message);
_channel.BasicNack(ea.DeliveryTag, false, false);
}
};
_channel.BasicConsume(QueueName, false, consumer);
_logger.LogInformation("OrderProcessorService started consuming orders");
await Task.CompletedTask;
}
private async Task ProcessOrderAsync(string message)
{
try
{
var orderMessage = JsonSerializer.Deserialize<OrderMessage>(message);
if (orderMessage == null)
{
_logger.LogWarning("Received invalid order message: {Message}", message);
return;
}
_logger.LogInformation("Processing order {OrderId} for customer {CustomerId}",
orderMessage.OrderId, orderMessage.CustomerId);
// 模擬訂單處理邏輯
await ProcessOrderBusinessLogic(orderMessage);
// 發佈處理完成狀態
var statusMessage = new OrderStatusMessage
{
OrderId = orderMessage.OrderId,
Status = Order.Core.Models.OrderStatus.Completed,
Message = "Order processed successfully"
};
await _orderPublisher.PublishOrderStatusAsync(statusMessage);
_logger.LogInformation("Order {OrderId} processed successfully", orderMessage.OrderId);
}
catch (JsonException ex)
{
_logger.LogError(ex, "Error deserializing order message: {Message}", message);
throw;
}
}
private async Task ProcessOrderBusinessLogic(OrderMessage orderMessage)
{
// 模擬複雜的業務邏輯處理
_logger.LogInformation("Starting business logic for order {OrderId}", orderMessage.OrderId);
// 模擬處理時間
var random = new Random();
var processingTime = random.Next(2000, 5000);
await Task.Delay(processingTime);
// 模擬10%的失敗率
if (random.Next(0, 10) == 0)
{
throw new Exception("Simulated business logic failure");
}
_logger.LogInformation("Business logic completed for order {OrderId}", orderMessage.OrderId);
}
public override void Dispose()
{
_channel?.Close();
_channel?.Dispose();
base.Dispose();
}
}
}
View Code
第6步:運行與測試
-
啓動服務
# 終端1:啓動Order.API
cd Order.API
dotnet run
# 終端2:啓動OrderProcessor.Service
cd OrderProcessor.Service
dotnet run
-
測試API
# 創建訂單
curl -X POST "https://localhost:7000/api/orders" \
-H "Content-Type: application/json" \
-d '{
"customerId": "customer-123",
"productId": "product-456",
"quantity": 2,
"unitPrice": 29.99
}'
# 查詢訂單狀態
curl "https://localhost:7000/api/orders/{orderId}"
-
測試健康檢查
GET https://localhost:7000/health
-
觀察日誌輸出
-
測試錯誤場景
第7步:高級特性 - 配置重試和 resilience
在Order.Infrastructure中添加Polly支持:
// 添加NuGet包
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly
// 在Program.cs中添加重試策略
builder.Services.AddHttpClient("retry-client")
.AddTransientHttpErrorPolicy(policy =>
policy.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));
本章總結
在這一章中,我們成功地將RabbitMQ集成到ASP.NET Core應用程序中,構建了一個完整的微服務系統:
-
依賴注入配置:正確管理RabbitMQ連接和通道的生命週期。
-
託管服務:使用BackgroundService實現長時間運行的消費者服務。
-
領域驅動設計:採用分層架構,分離關注點。
-
消息序列化:使用JSON序列化消息體。
-
健康檢查:集成RabbitMQ健康監控。
-
錯誤處理:實現完善的錯誤處理和日誌記錄。
-
配置管理:從配置文件讀取連接字符串。
這個架構為構建生產級的微服務系統提供了堅實的基礎。在下一章,我們將學習RabbitMQ的RPC模式。