事件驅動架構實現用户行為積分獎勵系統

事件驅動架構(EDA)適合處理用户行為積分系統,因其天然支持異步、解耦和可擴展性。以下為基於Java和MySQL的實現方案:

核心組件設計

系統由事件生產者、事件消費者、積分規則引擎和存儲層構成。事件生產者負責捕捉用户行為(如登錄、購物),事件消費者處理行為並觸發積分計算。

MySQL設計兩張核心表:

  • 用户行為表(user_behavior):存儲事件ID、用户ID、行為類型、時間戳等
  • 積分賬户表(points_account):記錄用户ID、當前積分、最後更新時間
CREATE TABLE user_behavior (
    event_id VARCHAR(36) PRIMARY KEY,
    user_id BIGINT NOT NULL,
    behavior_type VARCHAR(50) NOT NULL,
    event_data JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE points_account (
    user_id BIGINT PRIMARY KEY,
    points_balance INT DEFAULT 0,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

事件發佈與訂閲實現

使用Spring Event框架實現事件驅動機制。定義基礎事件類:

public abstract class UserBehaviorEvent {
    private String eventId;
    private Long userId;
    private LocalDateTime timestamp;
    // getters/setters
}

public class UserLoginEvent extends UserBehaviorEvent {
    private String loginDevice;
    // 其他登錄相關屬性
}

配置事件發佈者:

@Service
public class UserBehaviorService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void recordLogin(Long userId) {
        UserLoginEvent event = new UserLoginEvent();
        event.setUserId(userId);
        event.setEventId(UUID.randomUUID().toString());
        eventPublisher.publishEvent(event);
    }
}

積分規則引擎

採用策略模式實現不同行為的積分規則:

public interface PointsCalculationStrategy {
    int calculatePoints(UserBehaviorEvent event);
}

@Service
public class LoginPointsStrategy implements PointsCalculationStrategy {
    @Override
    public int calculatePoints(UserBehaviorEvent event) {
        return 10; // 每次登錄固定10分
    }
}

事件處理器集成規則引擎:

@Service
@Transactional
public class PointsEventHandler {
    @Autowired
    private PointsRepository pointsRepo;
    
    @Autowired
    private Map<String, PointsCalculationStrategy> strategies;

    @EventListener
    public void handleUserBehaviorEvent(UserBehaviorEvent event) {
        String behaviorType = event.getClass().getSimpleName();
        PointsCalculationStrategy strategy = strategies.get(behaviorType);
        
        if(strategy != null) {
            int points = strategy.calculatePoints(event);
            pointsRepo.addPoints(event.getUserId(), points);
        }
    }
}

異步處理優化

對於高併發場景,引入Spring @Async實現異步處理:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
}

@Async
@EventListener
public void handleUserBehaviorEventAsync(UserBehaviorEvent event) {
    // 同上處理邏輯
}

數據一致性保障

採用本地事務表模式保證最終一致性:

CREATE TABLE points_transaction (
    tx_id VARCHAR(36) PRIMARY KEY,
    user_id BIGINT NOT NULL,
    points_change INT NOT NULL,
    status VARCHAR(20) DEFAULT 'PENDING',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

處理流程:

  1. 先記錄積分變動事務
  2. 更新積分賬户
  3. 標記事務完成
  4. 定時任務補償處理失敗事務
@Scheduled(fixedRate = 300000)
public void compensateFailedTransactions() {
    List<PointsTransaction> pendingTx = transactionRepo.findPending();
    pendingTx.forEach(this::retryTransaction);
}

性能優化措施

MySQL層面優化:

  • 為user_behavior表的user_id和behavior_type創建聯合索引
  • 對points_account表進行分庫分表設計
  • 使用Redis緩存熱點用户的積分數據
@Cacheable(value = "userPoints", key = "#userId")
public Integer getPointsBalance(Long userId) {
    return pointsRepo.findBalanceByUserId(userId);
}

監控與日誌

集成Micrometer實現指標監控:

  • 記錄每種行為類型的事件數量
  • 監控積分變動速率
  • 跟蹤事務處理延遲
@EventListener
public void handleWithMetrics(UserBehaviorEvent event) {
    Counter.builder("user.behavior.events")
           .tag("type", event.getClass().getSimpleName())
           .register(meterRegistry)
           .increment();
    
    // 原有處理邏輯
}

該架構可根據業務需求靈活擴展,新增行為類型只需添加新的事件類和策略實現,無需修改核心處理邏輯。通過事件溯源模式可以完整追溯積分變動歷史,滿足審計要求。