本章目標
-
掌握在ASP.NET Core中配置和依賴注入RabbitMQ服務。
-
學習使用
IHostedService/BackgroundService實現常駐消費者服務。 -
實現基於RabbitMQ的請求-響應模式。
-
構建完整的微服務間異步通信解決方案。
-
學習配置管理和健康檢查。
一、理論部分
1. ASP.NET Core集成模式
將RabbitMQ集成到ASP.NET Core應用程序時,我們需要考慮幾個關鍵方面:
-
依賴注入:正確管理連接和通道的生命週期。
-
託管服務:實現後台消息消費者。
-
配置管理:從配置文件讀取RabbitMQ連接設置。
-
健康檢查:監控RabbitMQ連接狀態。
-
日誌記錄:使用ASP.NET Core的日誌系統。
2. 生命週期管理
-
IConnection:建議註冊為單例,因為創建TCP連接開銷大。
-
IModel:建議註冊為瞬態或作用域,因為通道不是線程安全的。
-
生產者服務:可以註冊為作用域或瞬態。
-
消費者服務:通常在託管服務中管理。
3. 託管服務(Hosted Services)
ASP.NET Core提供了IHostedService接口和BackgroundService基類,用於實現長時間運行的後台任務。這是實現RabbitMQ消費者的理想方式。
4. 微服務架構中的消息模式
-
異步命令:發送指令但不期待立即響應。
-
事件通知:廣播狀態變化。
-
請求-響應:類似RPC,但通過消息中間件。
二、實操部分:構建訂單處理微服務
我們將創建一個完整的訂單處理系統,包含:
-
Order.API:接收HTTP訂單請求,發佈消息
-
OrderProcessor.BackgroundService:後台處理訂單
-
訂單狀態查詢API
-
健康檢查
-
配置管理
第1步:創建項目結構
# 創建解決方案 dotnet new sln -n OrderSystem # 創建Web API項目 dotnet new webapi -n Order.API dotnet new classlib -n Order.Core dotnet new classlib -n Order.Infrastructure dotnet new classlib -n OrderProcessor.Service # 添加到解決方案 dotnet sln add Order.API/Order.API.csproj dotnet sln add Order.Core/Order.Core.csproj dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj # 添加項目引用 dotnet add Order.API reference Order.Core dotnet add Order.API reference Order.Infrastructure dotnet add OrderProcessor.Service reference Order.Core dotnet add OrderProcessor.Service reference Order.Infrastructure dotnet add Order.Infrastructure reference Order.Core # 添加NuGet包 cd Order.API dotnet add package RabbitMQ.Client dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks cd ../Order.Infrastructure dotnet add package RabbitMQ.Client dotnet add package Microsoft.Extensions.Configuration cd ../OrderProcessor.Service dotnet add package RabbitMQ.Client
第2步:定義領域模型(Order.Core)
Models/Order.cs
Messages/OrderMessage.cs
第3步:基礎設施層(Order.Infrastructure)
Services/IRabbitMQConnection.cs
using RabbitMQ.Client; namespace Order.Infrastructure.Services { public interface IRabbitMQConnection : IDisposable { bool IsConnected { get; } IModel CreateModel(); bool TryConnect(); } }
Services/RabbitMQConnection.cs
Services/IOrderPublisher.cs
Services/OrderPublisher.cs
第4步:Order.API項目配置
appsettings.json
Program.cs
Services/IOrderService.cs
Services/OrderService.cs
using Order.Core.Messages; using Order.Core.Models; using Order.Infrastructure.Services; namespace Order.API.Services { public class OrderService : IOrderService { private readonly IOrderPublisher _orderPublisher; private readonly ILogger<OrderService> _logger; // 內存存儲用於演示(生產環境應該用數據庫) private static readonly Dictionary<string, Order> _orders = new(); public OrderService(IOrderPublisher orderPublisher, ILogger<OrderService> logger) { _orderPublisher = orderPublisher; _logger = logger; } public async Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice) { var order = new Order { CustomerId = customerId, ProductId = productId, Quantity = quantity, TotalAmount = quantity * unitPrice, Status = OrderStatus.Pending }; // 保存到內存 _orders[order.Id] = order; // 發佈訂單創建事件 var orderMessage = new OrderMessage { OrderId = order.Id, CustomerId = order.CustomerId, ProductId = order.ProductId, Quantity = order.Quantity, TotalAmount = order.TotalAmount, Action = "create" }; await _orderPublisher.PublishOrderCreatedAsync(orderMessage); _logger.LogInformation("Order {OrderId} created and published", order.Id); return order; } public Task<Order?> GetOrderAsync(string orderId) { _orders.TryGetValue(orderId, out var order); return Task.FromResult(order); } public async Task UpdateOrderStatusAsync(string orderId, OrderStatus status) { if (_orders.TryGetValue(orderId, out var order)) { order.Status = status; order.ProcessedAt = DateTime.UtcNow; // 發佈狀態更新 var statusMessage = new OrderStatusMessage { OrderId = orderId, Status = status, Message = $"Order {status.ToString().ToLower()}" }; await _orderPublisher.PublishOrderStatusAsync(statusMessage); _logger.LogInformation("Order {OrderId} status updated to {Status}", orderId, status); } } } }
Services/OrderStatusConsumerService.cs
Controllers/OrdersController.cs
第5步:訂單處理器服務(OrderProcessor.Service)
Program.cs
Services/OrderProcessorService.cs
第6步:運行與測試
-
啓動服務
-
測試API
-
測試健康檢查
-
觀察日誌輸出
-
Order.API:接收HTTP請求,發佈訂單創建消息
-
OrderProcessor.Service:消費訂單消息,處理業務邏輯,發佈狀態更新
-
Order.API:消費狀態更新消息
-
-
測試錯誤場景
-
停止RabbitMQ服務,觀察重連機制
-
停止OrderProcessor.Service,觀察消息堆積
-
重啓服務,觀察消息恢復處理
-
第7步:高級特性 - 配置重試和 resilience
在Order.Infrastructure中添加Polly支持:
本章總結
在這一章中,我們成功地將RabbitMQ集成到ASP.NET Core應用程序中,構建了一個完整的微服務系統:
-
依賴注入配置:正確管理RabbitMQ連接和通道的生命週期。
-
託管服務:使用
BackgroundService實現長時間運行的消費者服務。 -
領域驅動設計:採用分層架構,分離關注點。
-
消息序列化:使用JSON序列化消息體。
-
健康檢查:集成RabbitMQ健康監控。
-
錯誤處理:實現完善的錯誤處理和日誌記錄。
-
配置管理:從配置文件讀取連接字符串。
這個架構為構建生產級的微服務系統提供了堅實的基礎。在下一章,我們將學習RabbitMQ的RPC模式。