事件驅動架構實現用户行為積分獎勵系統
事件驅動架構(EDA)適合處理用户行為積分系統,因其天然支持異步、解耦和可擴展性。以下為基於Java和MySQL的實現方案:
核心組件設計
系統由事件生產者、事件消費者、積分規則引擎和存儲層構成。事件生產者負責捕捉用户行為(如登錄、購物),事件消費者處理行為並觸發積分計算。
MySQL設計兩張核心表:
- 用户行為表(user_behavior):存儲事件ID、用户ID、行為類型、時間戳等
- 積分賬户表(points_account):記錄用户ID、當前積分、最後更新時間
CREATE TABLE user_behavior (
event_id VARCHAR(36) PRIMARY KEY,
user_id BIGINT NOT NULL,
behavior_type VARCHAR(50) NOT NULL,
event_data JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE points_account (
user_id BIGINT PRIMARY KEY,
points_balance INT DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
事件發佈與訂閲實現
使用Spring Event框架實現事件驅動機制。定義基礎事件類:
public abstract class UserBehaviorEvent {
private String eventId;
private Long userId;
private LocalDateTime timestamp;
// getters/setters
}
public class UserLoginEvent extends UserBehaviorEvent {
private String loginDevice;
// 其他登錄相關屬性
}
配置事件發佈者:
@Service
public class UserBehaviorService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void recordLogin(Long userId) {
UserLoginEvent event = new UserLoginEvent();
event.setUserId(userId);
event.setEventId(UUID.randomUUID().toString());
eventPublisher.publishEvent(event);
}
}
積分規則引擎
採用策略模式實現不同行為的積分規則:
public interface PointsCalculationStrategy {
int calculatePoints(UserBehaviorEvent event);
}
@Service
public class LoginPointsStrategy implements PointsCalculationStrategy {
@Override
public int calculatePoints(UserBehaviorEvent event) {
return 10; // 每次登錄固定10分
}
}
事件處理器集成規則引擎:
@Service
@Transactional
public class PointsEventHandler {
@Autowired
private PointsRepository pointsRepo;
@Autowired
private Map<String, PointsCalculationStrategy> strategies;
@EventListener
public void handleUserBehaviorEvent(UserBehaviorEvent event) {
String behaviorType = event.getClass().getSimpleName();
PointsCalculationStrategy strategy = strategies.get(behaviorType);
if(strategy != null) {
int points = strategy.calculatePoints(event);
pointsRepo.addPoints(event.getUserId(), points);
}
}
}
異步處理優化
對於高併發場景,引入Spring @Async實現異步處理:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.initialize();
return executor;
}
}
@Async
@EventListener
public void handleUserBehaviorEventAsync(UserBehaviorEvent event) {
// 同上處理邏輯
}
數據一致性保障
採用本地事務表模式保證最終一致性:
CREATE TABLE points_transaction (
tx_id VARCHAR(36) PRIMARY KEY,
user_id BIGINT NOT NULL,
points_change INT NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
處理流程:
- 先記錄積分變動事務
- 更新積分賬户
- 標記事務完成
- 定時任務補償處理失敗事務
@Scheduled(fixedRate = 300000)
public void compensateFailedTransactions() {
List<PointsTransaction> pendingTx = transactionRepo.findPending();
pendingTx.forEach(this::retryTransaction);
}
性能優化措施
MySQL層面優化:
- 為user_behavior表的user_id和behavior_type創建聯合索引
- 對points_account表進行分庫分表設計
- 使用Redis緩存熱點用户的積分數據
@Cacheable(value = "userPoints", key = "#userId")
public Integer getPointsBalance(Long userId) {
return pointsRepo.findBalanceByUserId(userId);
}
監控與日誌
集成Micrometer實現指標監控:
- 記錄每種行為類型的事件數量
- 監控積分變動速率
- 跟蹤事務處理延遲
@EventListener
public void handleWithMetrics(UserBehaviorEvent event) {
Counter.builder("user.behavior.events")
.tag("type", event.getClass().getSimpleName())
.register(meterRegistry)
.increment();
// 原有處理邏輯
}
該架構可根據業務需求靈活擴展,新增行為類型只需添加新的事件類和策略實現,無需修改核心處理邏輯。通過事件溯源模式可以完整追溯積分變動歷史,滿足審計要求。