Stories

Detail Return Return

【RabbitMQ】與ASP.NET Core集成 - Stories Detail

本章目標

  • 掌握在ASP.NET Core中配置和依賴注入RabbitMQ服務。

  • 學習使用IHostedService/BackgroundService實現常駐消費者服務。

  • 實現基於RabbitMQ的請求-響應模式。

  • 構建完整的微服務間異步通信解決方案。

  • 學習配置管理和健康檢查。


一、理論部分

1. ASP.NET Core集成模式

將RabbitMQ集成到ASP.NET Core應用程序時,我們需要考慮幾個關鍵方面:

  • 依賴注入:正確管理連接和通道的生命週期。

  • 託管服務:實現後台消息消費者。

  • 配置管理:從配置文件讀取RabbitMQ連接設置。

  • 健康檢查:監控RabbitMQ連接狀態。

  • 日誌記錄:使用ASP.NET Core的日誌系統。

2. 生命週期管理

  • IConnection:建議註冊為單例,因為創建TCP連接開銷大。

  • IModel:建議註冊為瞬態或作用域,因為通道不是線程安全的。

  • 生產者服務:可以註冊為作用域或瞬態。

  • 消費者服務:通常在託管服務中管理。

3. 託管服務(Hosted Services)

ASP.NET Core提供了IHostedService接口和BackgroundService基類,用於實現長時間運行的後台任務。這是實現RabbitMQ消費者的理想方式。

4. 微服務架構中的消息模式

  • 異步命令:發送指令但不期待立即響應。

  • 事件通知:廣播狀態變化。

  • 請求-響應:類似RPC,但通過消息中間件。


二、實操部分:構建訂單處理微服務

我們將創建一個完整的訂單處理系統,包含:

  • Order.API:接收HTTP訂單請求,發佈消息

  • OrderProcessor.BackgroundService:後台處理訂單

  • 訂單狀態查詢API

  • 健康檢查

  • 配置管理

第1步:創建項目結構

# 創建解決方案
dotnet new sln -n OrderSystem

# 創建Web API項目
dotnet new webapi -n Order.API
dotnet new classlib -n Order.Core
dotnet new classlib -n Order.Infrastructure
dotnet new classlib -n OrderProcessor.Service

# 添加到解決方案
dotnet sln add Order.API/Order.API.csproj
dotnet sln add Order.Core/Order.Core.csproj
dotnet sln add Order.Infrastructure/Order.Infrastructure.csproj
dotnet sln add OrderProcessor.Service/OrderProcessor.Service.csproj

# 添加項目引用
dotnet add Order.API reference Order.Core
dotnet add Order.API reference Order.Infrastructure
dotnet add OrderProcessor.Service reference Order.Core
dotnet add OrderProcessor.Service reference Order.Infrastructure
dotnet add Order.Infrastructure reference Order.Core

# 添加NuGet包
cd Order.API
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Diagnostics.HealthChecks

cd ../Order.Infrastructure
dotnet add package RabbitMQ.Client
dotnet add package Microsoft.Extensions.Configuration

cd ../OrderProcessor.Service
dotnet add package RabbitMQ.Client

第2步:定義領域模型(Order.Core)

Models/Order.cs

namespace Order.Core.Models
{
    public class Order
    {
        public string Id { get; set; } = Guid.NewGuid().ToString();
        public string CustomerId { get; set; }
        public string ProductId { get; set; }
        public int Quantity { get; set; }
        public decimal TotalAmount { get; set; }
        public OrderStatus Status { get; set; } = OrderStatus.Pending;
        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
        public DateTime? ProcessedAt { get; set; }
    }

    public enum OrderStatus
    {
        Pending,
        Processing,
        Completed,
        Failed,
        Cancelled
    }
}

Messages/OrderMessage.cs

namespace Order.Core.Messages
{
    public class OrderMessage
    {
        public string OrderId { get; set; }
        public string CustomerId { get; set; }
        public string ProductId { get; set; }
        public int Quantity { get; set; }
        public decimal TotalAmount { get; set; }
        public string Action { get; set; } // "create", "cancel"
    }

    public class OrderStatusMessage
    {
        public string OrderId { get; set; }
        public OrderStatus Status { get; set; }
        public string Message { get; set; }
        public DateTime Timestamp { get; set; } = DateTime.UtcNow;
    }
}

第3步:基礎設施層(Order.Infrastructure)

Services/IRabbitMQConnection.cs

using RabbitMQ.Client;

namespace Order.Infrastructure.Services
{
    public interface IRabbitMQConnection : IDisposable
    {
        bool IsConnected { get; }
        IModel CreateModel();
        bool TryConnect();
    }
}

Services/RabbitMQConnection.cs

using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Order.Infrastructure.Services
{
    public class RabbitMQConnection : IRabbitMQConnection
    {
        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<RabbitMQConnection> _logger;
        private IConnection _connection;
        private bool _disposed;
        
        private readonly object _syncRoot = new object();

        public RabbitMQConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQConnection> logger)
        {
            _connectionFactory = connectionFactory;
            _logger = logger;
        }

        public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;

        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }

        public bool TryConnect()
        {
            lock (_syncRoot)
            {
                if (IsConnected) return true;

                _logger.LogInformation("RabbitMQ Client is trying to connect");

                try
                {
                    _connection = _connectionFactory.CreateConnection();
                    
                    _connection.ConnectionShutdown += OnConnectionShutdown;
                    _connection.CallbackException += OnCallbackException;
                    _connection.ConnectionBlocked += OnConnectionBlocked;

                    _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", 
                        _connectionFactory.HostName);

                    return true;
                }
                catch (BrokerUnreachableException ex)
                {
                    _logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
                    return false;
                }
                catch (SocketException ex)
                {
                    _logger.LogError(ex, "RabbitMQ connection failed: {Message}", ex.Message);
                    return false;
                }
            }
        }

        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is blocked. Reason: {Reason}", e.Reason);
            
            // 這裏可以實現重連邏輯
            TryConnect();
        }

        private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {
            if (_disposed) return;

            _logger.LogWarning(e.Exception, "A RabbitMQ connection throw exception. Trying to re-connect...");
            
            // 這裏可以實現重連邏輯
            TryConnect();
        }

        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {
            if (_disposed) return;

            _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");

            // 這裏可以實現重連邏輯
            TryConnect();
        }

        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                _connection?.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogCritical(ex, "Error disposing RabbitMQ connection");
            }
        }
    }
}

Services/IOrderPublisher.cs

using Order.Core.Messages;

namespace Order.Infrastructure.Services
{
    public interface IOrderPublisher
    {
        Task PublishOrderCreatedAsync(OrderMessage order);
        Task PublishOrderStatusAsync(OrderStatusMessage status);
    }
}

Services/OrderPublisher.cs

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Order.Core.Messages;
using RabbitMQ.Client;

namespace Order.Infrastructure.Services
{
    public class OrderPublisher : IOrderPublisher
    {
        private readonly IRabbitMQConnection _connection;
        private readonly ILogger<OrderPublisher> _logger;
        private const string ExchangeName = "order.events";
        private const string OrderCreatedRoutingKey = "order.created";
        private const string OrderStatusRoutingKey = "order.status";

        public OrderPublisher(IRabbitMQConnection connection, ILogger<OrderPublisher> logger)
        {
            _connection = connection;
            _logger = logger;
            
            // 確保交換機和隊列存在
            InitializeInfrastructure();
        }

        private void InitializeInfrastructure()
        {
            using var channel = _connection.CreateModel();
            
            // 聲明主題交換機
            channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true);
            
            // 聲明訂單創建隊列
            channel.QueueDeclare("order.created.queue", durable: true, exclusive: false, autoDelete: false);
            channel.QueueBind("order.created.queue", ExchangeName, OrderCreatedRoutingKey);
            
            // 聲明訂單狀態隊列
            channel.QueueDeclare("order.status.queue", durable: true, exclusive: false, autoDelete: false);
            channel.QueueBind("order.status.queue", ExchangeName, OrderStatusRoutingKey);
            
            _logger.LogInformation("RabbitMQ infrastructure initialized");
        }

        public async Task PublishOrderCreatedAsync(OrderMessage order)
        {
            await PublishMessageAsync(order, OrderCreatedRoutingKey, "OrderCreated");
        }

        public async Task PublishOrderStatusAsync(OrderStatusMessage status)
        {
            await PublishMessageAsync(status, OrderStatusRoutingKey, "OrderStatus");
        }

        private async Task PublishMessageAsync<T>(T message, string routingKey, string messageType)
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            using var channel = _connection.CreateModel();
            
            var json = JsonSerializer.Serialize(message);
            var body = Encoding.UTF8.GetBytes(json);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.ContentType = "application/json";
            properties.Type = messageType;

            try
            {
                channel.BasicPublish(
                    exchange: ExchangeName,
                    routingKey: routingKey,
                    mandatory: true,
                    basicProperties: properties,
                    body: body);

                _logger.LogInformation("Published {MessageType} message for Order {OrderId}", 
                    messageType, GetOrderId(message));
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error publishing {MessageType} message for Order {OrderId}", 
                    messageType, GetOrderId(message));
                throw;
            }

            await Task.CompletedTask;
        }

        private static string GetOrderId<T>(T message)
        {
            return message switch
            {
                OrderMessage order => order.OrderId,
                OrderStatusMessage status => status.OrderId,
                _ => "unknown"
            };
        }
    }
}

第4步:Order.API項目配置

appsettings.json

{
  "RabbitMQ": {
    "HostName": "localhost",
    "UserName": "myuser",
    "Password": "mypassword",
    "Port": 5672,
    "VirtualHost": "/"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*"
}

Program.cs

using Order.API.Controllers;
using Order.API.Services;
using Order.Core.Models;
using Order.Infrastructure.Services;
using RabbitMQ.Client;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// Configure RabbitMQ
builder.Services.AddSingleton<IConnectionFactory>(sp =>
{
    var configuration = sp.GetRequiredService<IConfiguration>();
    return new ConnectionFactory
    {
        HostName = configuration["RabbitMQ:HostName"],
        UserName = configuration["RabbitMQ:UserName"],
        Password = configuration["RabbitMQ:Password"],
        Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
        VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
        DispatchConsumersAsync = true
    };
});

// Register RabbitMQ services
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();
builder.Services.AddScoped<IOrderService, OrderService>();

// Add Health Checks
builder.Services.AddHealthChecks()
    .AddRabbitMQ(provider => 
    {
        var factory = provider.GetRequiredService<IConnectionFactory>();
        return factory.CreateConnection();
    }, name: "rabbitmq");

// Add hosted service for status updates consumer
builder.Services.AddHostedService<OrderStatusConsumerService>();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();

// Add health check endpoint
app.MapHealthChecks("/health");

app.Run();

Services/IOrderService.cs

using Order.Core.Models;

namespace Order.API.Services
{
    public interface IOrderService
    {
        Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice);
        Task<Order?> GetOrderAsync(string orderId);
        Task UpdateOrderStatusAsync(string orderId, OrderStatus status);
    }
}

Services/OrderService.cs

using Order.Core.Messages;
using Order.Core.Models;
using Order.Infrastructure.Services;

namespace Order.API.Services
{
    public class OrderService : IOrderService
    {
        private readonly IOrderPublisher _orderPublisher;
        private readonly ILogger<OrderService> _logger;
        
        // 內存存儲用於演示(生產環境應該用數據庫)
        private static readonly Dictionary<string, Order> _orders = new();

        public OrderService(IOrderPublisher orderPublisher, ILogger<OrderService> logger)
        {
            _orderPublisher = orderPublisher;
            _logger = logger;
        }

        public async Task<Order> CreateOrderAsync(string customerId, string productId, int quantity, decimal unitPrice)
        {
            var order = new Order
            {
                CustomerId = customerId,
                ProductId = productId,
                Quantity = quantity,
                TotalAmount = quantity * unitPrice,
                Status = OrderStatus.Pending
            };

            // 保存到內存
            _orders[order.Id] = order;

            // 發佈訂單創建事件
            var orderMessage = new OrderMessage
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                ProductId = order.ProductId,
                Quantity = order.Quantity,
                TotalAmount = order.TotalAmount,
                Action = "create"
            };

            await _orderPublisher.PublishOrderCreatedAsync(orderMessage);
            
            _logger.LogInformation("Order {OrderId} created and published", order.Id);

            return order;
        }

        public Task<Order?> GetOrderAsync(string orderId)
        {
            _orders.TryGetValue(orderId, out var order);
            return Task.FromResult(order);
        }

        public async Task UpdateOrderStatusAsync(string orderId, OrderStatus status)
        {
            if (_orders.TryGetValue(orderId, out var order))
            {
                order.Status = status;
                order.ProcessedAt = DateTime.UtcNow;
                
                // 發佈狀態更新
                var statusMessage = new OrderStatusMessage
                {
                    OrderId = orderId,
                    Status = status,
                    Message = $"Order {status.ToString().ToLower()}"
                };

                await _orderPublisher.PublishOrderStatusAsync(statusMessage);
                
                _logger.LogInformation("Order {OrderId} status updated to {Status}", orderId, status);
            }
        }
    }
}
View Code

Services/OrderStatusConsumerService.cs

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Options;
using Order.API.Services;
using Order.Core.Messages;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Order.API.Services
{
    public class OrderStatusConsumerService : BackgroundService
    {
        private readonly IRabbitMQConnection _connection;
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger<OrderStatusConsumerService> _logger;
        private IModel _channel;
        private const string QueueName = "order.status.queue";

        public OrderStatusConsumerService(
            IRabbitMQConnection connection,
            IServiceProvider serviceProvider,
            ILogger<OrderStatusConsumerService> logger)
        {
            _connection = connection;
            _serviceProvider = serviceProvider;
            _logger = logger;
            InitializeChannel();
        }

        private void InitializeChannel()
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            _channel = _connection.CreateModel();
            
            // 確保隊列存在(已經在Publisher中聲明,這裏做雙重保險)
            _channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
            
            _channel.BasicQos(0, 1, false); // 公平分發
            
            _logger.LogInformation("OrderStatusConsumerService channel initialized");
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.ThrowIfCancellationRequested();

            var consumer = new AsyncEventingBasicConsumer(_channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                try
                {
                    await ProcessMessageAsync(message);
                    _channel.BasicAck(ea.DeliveryTag, false);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing message: {Message}", message);
                    _channel.BasicNack(ea.DeliveryTag, false, false); // 不重新入隊
                }
            };

            _channel.BasicConsume(QueueName, false, consumer);

            _logger.LogInformation("OrderStatusConsumerService started consuming");

            await Task.CompletedTask;
        }

        private async Task ProcessMessageAsync(string message)
        {
            using var scope = _serviceProvider.CreateScope();
            var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();

            try
            {
                var statusMessage = JsonSerializer.Deserialize<OrderStatusMessage>(message);
                if (statusMessage != null)
                {
                    // 這裏可以處理狀態更新,比如更新數據庫、發送通知等
                    _logger.LogInformation("Received order status update: {OrderId} -> {Status}", 
                        statusMessage.OrderId, statusMessage.Status);
                    
                    // 在實際應用中,這裏可能會更新數據庫中的訂單狀態
                    // await orderService.UpdateOrderStatusAsync(statusMessage.OrderId, statusMessage.Status);
                }
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "Error deserializing message: {Message}", message);
                throw;
            }
        }

        public override void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            base.Dispose();
        }
    }
}
View Code

Controllers/OrdersController.cs

using Microsoft.AspNetCore.Mvc;
using Order.API.Services;
using Order.Core.Models;

namespace Order.API.Controllers
{
    [ApiController]
    [Route("api/[controller]")]
    public class OrdersController : ControllerBase
    {
        private readonly IOrderService _orderService;
        private readonly ILogger<OrdersController> _logger;

        public OrdersController(IOrderService orderService, ILogger<OrdersController> logger)
        {
            _orderService = orderService;
            _logger = logger;
        }

        [HttpPost]
        public async Task<ActionResult<Order>> CreateOrder([FromBody] CreateOrderRequest request)
        {
            try
            {
                var order = await _orderService.CreateOrderAsync(
                    request.CustomerId, 
                    request.ProductId, 
                    request.Quantity, 
                    request.UnitPrice);

                return Ok(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error creating order");
                return StatusCode(500, "Error creating order");
            }
        }

        [HttpGet("{orderId}")]
        public async Task<ActionResult<Order>> GetOrder(string orderId)
        {
            var order = await _orderService.GetOrderAsync(orderId);
            if (order == null)
            {
                return NotFound();
            }
            return Ok(order);
        }

        [HttpGet]
        public ActionResult<IEnumerable<Order>> GetOrders()
        {
            // 這裏只是演示,實際應該從數據庫獲取
            return Ok(new List<Order>());
        }
    }

    public class CreateOrderRequest
    {
        public string CustomerId { get; set; }
        public string ProductId { get; set; }
        public int Quantity { get; set; }
        public decimal UnitPrice { get; set; }
    }
}
View Code

第5步:訂單處理器服務(OrderProcessor.Service)

Program.cs

using Order.Core.Messages;
using Order.Infrastructure.Services;
using OrderProcessor.Service.Services;
using RabbitMQ.Client;

var builder = Host.CreateApplicationBuilder(args);

builder.Services.AddHostedService<OrderProcessorService>();

// Configure RabbitMQ
builder.Services.AddSingleton<IConnectionFactory>(sp =>
{
    var configuration = sp.GetRequiredService<IConfiguration>();
    return new ConnectionFactory
    {
        HostName = configuration["RabbitMQ:HostName"],
        UserName = configuration["RabbitMQ:UserName"],
        Password = configuration["RabbitMQ:Password"],
        Port = int.Parse(configuration["RabbitMQ:Port"] ?? "5672"),
        VirtualHost = configuration["RabbitMQ:VirtualHost"] ?? "/",
        DispatchConsumersAsync = true
    };
});

builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>();
builder.Services.AddScoped<IOrderPublisher, OrderPublisher>();

builder.Services.AddLogging();

var host = builder.Build();
host.Run();
View Code

Services/OrderProcessorService.cs

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Order.Core.Messages;
using Order.Infrastructure.Services;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace OrderProcessor.Service.Services
{
    public class OrderProcessorService : BackgroundService
    {
        private readonly IRabbitMQConnection _connection;
        private readonly IOrderPublisher _orderPublisher;
        private readonly ILogger<OrderProcessorService> _logger;
        private IModel _channel;
        private const string QueueName = "order.created.queue";

        public OrderProcessorService(
            IRabbitMQConnection connection,
            IOrderPublisher orderPublisher,
            ILogger<OrderProcessorService> logger)
        {
            _connection = connection;
            _orderPublisher = orderPublisher;
            _logger = logger;
            InitializeChannel();
        }

        private void InitializeChannel()
        {
            if (!_connection.IsConnected)
            {
                _connection.TryConnect();
            }

            _channel = _connection.CreateModel();
            _channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false);
            _channel.BasicQos(0, 1, false);
            
            _logger.LogInformation("OrderProcessorService channel initialized");
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.ThrowIfCancellationRequested();

            var consumer = new AsyncEventingBasicConsumer(_channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                try
                {
                    await ProcessOrderAsync(message);
                    _channel.BasicAck(ea.DeliveryTag, false);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing order: {Message}", message);
                    _channel.BasicNack(ea.DeliveryTag, false, false);
                }
            };

            _channel.BasicConsume(QueueName, false, consumer);

            _logger.LogInformation("OrderProcessorService started consuming orders");

            await Task.CompletedTask;
        }

        private async Task ProcessOrderAsync(string message)
        {
            try
            {
                var orderMessage = JsonSerializer.Deserialize<OrderMessage>(message);
                if (orderMessage == null)
                {
                    _logger.LogWarning("Received invalid order message: {Message}", message);
                    return;
                }

                _logger.LogInformation("Processing order {OrderId} for customer {CustomerId}", 
                    orderMessage.OrderId, orderMessage.CustomerId);

                // 模擬訂單處理邏輯
                await ProcessOrderBusinessLogic(orderMessage);

                // 發佈處理完成狀態
                var statusMessage = new OrderStatusMessage
                {
                    OrderId = orderMessage.OrderId,
                    Status = Order.Core.Models.OrderStatus.Completed,
                    Message = "Order processed successfully"
                };

                await _orderPublisher.PublishOrderStatusAsync(statusMessage);
                
                _logger.LogInformation("Order {OrderId} processed successfully", orderMessage.OrderId);
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "Error deserializing order message: {Message}", message);
                throw;
            }
        }

        private async Task ProcessOrderBusinessLogic(OrderMessage orderMessage)
        {
            // 模擬複雜的業務邏輯處理
            _logger.LogInformation("Starting business logic for order {OrderId}", orderMessage.OrderId);
            
            // 模擬處理時間
            var random = new Random();
            var processingTime = random.Next(2000, 5000);
            await Task.Delay(processingTime);
            
            // 模擬10%的失敗率
            if (random.Next(0, 10) == 0)
            {
                throw new Exception("Simulated business logic failure");
            }
            
            _logger.LogInformation("Business logic completed for order {OrderId}", orderMessage.OrderId);
        }

        public override void Dispose()
        {
            _channel?.Close();
            _channel?.Dispose();
            base.Dispose();
        }
    }
}
View Code

第6步:運行與測試

  1. 啓動服務

    # 終端1:啓動Order.API
    cd Order.API
    dotnet run
    
    # 終端2:啓動OrderProcessor.Service
    cd OrderProcessor.Service
    dotnet run
  2. 測試API

    # 創建訂單
    curl -X POST "https://localhost:7000/api/orders" \
         -H "Content-Type: application/json" \
         -d '{
           "customerId": "customer-123",
           "productId": "product-456", 
           "quantity": 2,
           "unitPrice": 29.99
         }'
    
    # 查詢訂單狀態
    curl "https://localhost:7000/api/orders/{orderId}"
  3. 測試健康檢查

    GET https://localhost:7000/health
  4. 觀察日誌輸出

    • Order.API:接收HTTP請求,發佈訂單創建消息

    • OrderProcessor.Service:消費訂單消息,處理業務邏輯,發佈狀態更新

    • Order.API:消費狀態更新消息

  5. 測試錯誤場景

    • 停止RabbitMQ服務,觀察重連機制

    • 停止OrderProcessor.Service,觀察消息堆積

    • 重啓服務,觀察消息恢復處理

第7步:高級特性 - 配置重試和 resilience

在Order.Infrastructure中添加Polly支持:

// 添加NuGet包
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly

// 在Program.cs中添加重試策略
builder.Services.AddHttpClient("retry-client")
    .AddTransientHttpErrorPolicy(policy => 
        policy.WaitAndRetryAsync(3, retryAttempt => 
            TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));


本章總結

在這一章中,我們成功地將RabbitMQ集成到ASP.NET Core應用程序中,構建了一個完整的微服務系統:

  1. 依賴注入配置:正確管理RabbitMQ連接和通道的生命週期。

  2. 託管服務:使用BackgroundService實現長時間運行的消費者服務。

  3. 領域驅動設計:採用分層架構,分離關注點。

  4. 消息序列化:使用JSON序列化消息體。

  5. 健康檢查:集成RabbitMQ健康監控。

  6. 錯誤處理:實現完善的錯誤處理和日誌記錄。

  7. 配置管理:從配置文件讀取連接字符串。

這個架構為構建生產級的微服務系統提供了堅實的基礎。在下一章,我們將學習RabbitMQ的RPC模式。

user avatar u_9849794 Avatar ligaai Avatar kohler21 Avatar buildyuan Avatar tssc Avatar dalideshoushudao Avatar wnhyang Avatar java_study Avatar wuliaodeliema Avatar java_3y Avatar best_6455a509a2177 Avatar fulade Avatar
Favorites 57 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.