博客 / 詳情

返回

04 - 步驟執行器設計 🔧

🎯 目標: 設計可擴展的步驟執行器架構,實現各種類型的步驟執行邏輯

🤔 為什麼需要多種執行器?

不同的業務場景需要不同的執行邏輯:

  • 🔀 條件分支: 根據條件選擇不同的執行路徑
  • 並行執行: 同時執行多個獨立的任務
  • 🔄 循環執行: 重複執行某些步驟直到滿足條件
  • 📝 腳本執行: 動態執行腳本代碼
  • 🌐 遠程調用: 調用外部服務或API

🏗️ 執行器架構設計

graph TB
    subgraph "StepExecutor 接口層 🎯"
        A1[StepExecutor]
        A2[AbstractStepExecutor]
    end
    
    subgraph "基礎執行器 🔧"
        B1[SimpleStepExecutor]
        B2[ServiceStepExecutor]
        B3[ScriptStepExecutor]
    end
    
    subgraph "控制流執行器 🔀"
        C1[ConditionalStepExecutor]
        C2[ParallelStepExecutor]
        C3[LoopStepExecutor]
        C4[ScriptConditionalStepExecutor]
    end
    
    subgraph "擴展執行器 🚀"
        D1[HttpStepExecutor]
        D2[DatabaseStepExecutor]
        D3[MessageStepExecutor]
    end
    
    A1 --> A2
    A2 --> B1
    A2 --> B2
    A2 --> B3
    A2 --> C1
    A2 --> C2
    A2 --> C3
    A2 --> C4
    A2 --> D1
    A2 --> D2
    A2 --> D3

🎯 AbstractStepExecutor基類

/**
 * 步驟執行器抽象基類
 * 提供通用的執行邏輯和模板方法
 */
public abstract class AbstractStepExecutor implements StepExecutor {
    
    protected static final Logger logger = LoggerFactory.getLogger(AbstractStepExecutor.class);
    
    @Override
    public final StepExecutionResult execute(StepDefinition stepDefinition, FlowContext context) {
        long startTime = System.currentTimeMillis();
        String stepId = stepDefinition.getId();
        
        logger.debug("開始執行步驟: {} [{}]", stepDefinition.getName(), stepId);
        
        try {
            // 前置處理
            preExecute(stepDefinition, context);
            
            // 執行核心邏輯
            Object result = doExecute(stepDefinition, context);
            
            // 後置處理
            postExecute(stepDefinition, context, result);
            
            long endTime = System.currentTimeMillis();
            
            logger.debug("步驟執行成功: {} [{}], 耗時: {}ms", 
                stepDefinition.getName(), stepId, endTime - startTime);
            
            return createSuccessResult(stepDefinition, result, startTime, endTime);
            
        } catch (Exception e) {
            long endTime = System.currentTimeMillis();
            
            logger.error("步驟執行失敗: {} [{}], 耗時: {}ms", 
                stepDefinition.getName(), stepId, endTime - startTime, e);
            
            return createFailureResult(stepDefinition, e, startTime, endTime);
        }
    }
    
    /**
     * 執行前置處理
     * @param stepDefinition 步驟定義
     * @param context 執行上下文
     */
    protected void preExecute(StepDefinition stepDefinition, FlowContext context) {
        // 默認空實現,子類可以重寫
    }
    
    /**
     * 執行核心邏輯(子類必須實現)
     * @param stepDefinition 步驟定義
     * @param context 執行上下文
     * @return 執行結果
     */
    protected abstract Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception;
    
    /**
     * 執行後置處理
     * @param stepDefinition 步驟定義
     * @param context 執行上下文
     * @param result 執行結果
     */
    protected void postExecute(StepDefinition stepDefinition, FlowContext context, Object result) {
        // 保存結果到上下文
        String resultKey = getResultKey(stepDefinition);
        if (resultKey != null && result != null) {
            context.put(resultKey, result);
        }
    }
    
    /**
     * 獲取結果保存的鍵名
     */
    protected String getResultKey(StepDefinition stepDefinition) {
        return (String) stepDefinition.getConfig().get("resultKey");
    }
    
    /**
     * 創建成功結果
     */
    protected StepExecutionResult createSuccessResult(
            StepDefinition stepDefinition, Object result, long startTime, long endTime) {
        return new StepExecutionResult(
            stepDefinition.getId(),
            stepDefinition.getType(),
            FlowExecutionResult.Status.SUCCESS,
            result,
            startTime,
            endTime,
            null
        );
    }
    
    /**
     * 創建失敗結果
     */
    protected StepExecutionResult createFailureResult(
            StepDefinition stepDefinition, Exception exception, long startTime, long endTime) {
        return new StepExecutionResult(
            stepDefinition.getId(),
            stepDefinition.getType(),
            FlowExecutionResult.Status.FAILED,
            null,
            startTime,
            endTime,
            exception
        );
    }
    
    /**
     * 獲取配置值
     */
    protected <T> T getConfig(StepDefinition stepDefinition, String key, Class<T> type) {
        Object value = stepDefinition.getConfig().get(key);
        if (value == null) {
            return null;
        }
        
        if (type.isInstance(value)) {
            return type.cast(value);
        }
        
        throw new IllegalArgumentException(
            String.format("配置 %s 的類型不匹配,期望: %s, 實際: %s", 
                key, type.getSimpleName(), value.getClass().getSimpleName())
        );
    }
    
    /**
     * 獲取必需的配置值
     */
    protected <T> T getRequiredConfig(StepDefinition stepDefinition, String key, Class<T> type) {
        T value = getConfig(stepDefinition, key, type);
        if (value == null) {
            throw new IllegalArgumentException(
                String.format("步驟 %s 缺少必需的配置: %s", stepDefinition.getId(), key)
            );
        }
        return value;
    }
}

🔀 ConditionalStepExecutor - 條件執行器

/**
 * 條件步驟執行器
 * 根據條件表達式選擇執行不同的分支
 */
public class ConditionalStepExecutor extends AbstractStepExecutor {
    
    private final ExpressionEvaluator expressionEvaluator;
    private final FlowEngine flowEngine;
    
    public ConditionalStepExecutor(ExpressionEvaluator expressionEvaluator, FlowEngine flowEngine) {
        this.expressionEvaluator = expressionEvaluator;
        this.flowEngine = flowEngine;
    }
    
    @Override
    protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
        // 獲取條件表達式
        String condition = getRequiredConfig(stepDefinition, "condition", String.class);
        
        // 評估條件
        boolean conditionResult = expressionEvaluator.evaluateBoolean(condition, context);
        
        logger.debug("條件評估結果: {} -> {}", condition, conditionResult);
        
        // 根據條件選擇執行分支
        List<StepDefinition> targetSteps = conditionResult ? 
            getTrueSteps(stepDefinition) : getFalseSteps(stepDefinition);
        
        if (targetSteps == null || targetSteps.isEmpty()) {
            logger.debug("沒有找到匹配的執行分支");
            return conditionResult;
        }
        
        // 執行選中的步驟
        List<StepExecutionResult> branchResults = new ArrayList<>();
        for (StepDefinition step : targetSteps) {
            StepExecutor executor = flowEngine.getExecutor(step.getType());
            if (executor == null) {
                throw new IllegalStateException("未找到步驟類型的執行器: " + step.getType());
            }
            
            StepExecutionResult result = executor.execute(step, context);
            branchResults.add(result);
            
            // 如果步驟失敗,根據配置決定是否繼續
            if (result.getStatus() == FlowExecutionResult.Status.FAILED && 
                !shouldContinueOnFailure(step)) {
                break;
            }
        }
        
        return new ConditionalExecutionResult(conditionResult, branchResults);
    }
    
    /**
     * 獲取條件為真時執行的步驟
     */
    @SuppressWarnings("unchecked")
    private List<StepDefinition> getTrueSteps(StepDefinition stepDefinition) {
        return (List<StepDefinition>) stepDefinition.getConfig().get("trueSteps");
    }
    
    /**
     * 獲取條件為假時執行的步驟
     */
    @SuppressWarnings("unchecked")
    private List<StepDefinition> getFalseSteps(StepDefinition stepDefinition) {
        return (List<StepDefinition>) stepDefinition.getConfig().get("falseSteps");
    }
    
    /**
     * 判斷失敗時是否繼續執行
     */
    private boolean shouldContinueOnFailure(StepDefinition stepDefinition) {
        Boolean continueOnFailure = getConfig(stepDefinition, "continueOnFailure", Boolean.class);
        return Boolean.TRUE.equals(continueOnFailure);
    }
    
    @Override
    public StepType getSupportedType() {
        return StepType.CONDITIONAL;
    }
    
    @Override
    public ValidationResult validate(StepDefinition stepDefinition) {
        // 驗證條件表達式
        String condition = getConfig(stepDefinition, "condition", String.class);
        if (condition == null || condition.trim().isEmpty()) {
            return ValidationResult.error("條件步驟必須指定condition配置");
        }
        
        // 驗證至少有一個分支
        List<StepDefinition> trueSteps = getTrueSteps(stepDefinition);
        List<StepDefinition> falseSteps = getFalseSteps(stepDefinition);
        
        if ((trueSteps == null || trueSteps.isEmpty()) && 
            (falseSteps == null || falseSteps.isEmpty())) {
            return ValidationResult.error("條件步驟必須至少指定一個執行分支");
        }
        
        return ValidationResult.success();
    }
    
    /**
     * 條件執行結果
     */
    public static class ConditionalExecutionResult {
        private final boolean conditionResult;
        private final List<StepExecutionResult> branchResults;
        
        public ConditionalExecutionResult(boolean conditionResult, List<StepExecutionResult> branchResults) {
            this.conditionResult = conditionResult;
            this.branchResults = branchResults;
        }
        
        // getter方法...
    }
}

⚡ ParallelStepExecutor - 並行執行器

/**
 * 並行步驟執行器
 * 同時執行多個獨立的步驟
 */
public class ParallelStepExecutor extends AbstractStepExecutor {
    
    private final FlowEngine flowEngine;
    private final ExecutorService executorService;
    
    public ParallelStepExecutor(FlowEngine flowEngine) {
        this.flowEngine = flowEngine;
        this.executorService = Executors.newCachedThreadPool(
            new ThreadFactoryBuilder()
                .setNameFormat("parallel-step-%d")
                .setDaemon(true)
                .build()
        );
    }
    
    @Override
    protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
        // 獲取並行執行的步驟
        List<StepDefinition> parallelSteps = getParallelSteps(stepDefinition);
        if (parallelSteps == null || parallelSteps.isEmpty()) {
            logger.warn("並行步驟沒有配置子步驟");
            return Collections.emptyList();
        }
        
        // 獲取超時配置
        Integer timeoutSeconds = getConfig(stepDefinition, "timeoutSeconds", Integer.class);
        long timeout = timeoutSeconds != null ? timeoutSeconds : 300; // 默認5分鐘
        
        logger.debug("開始並行執行 {} 個步驟,超時: {}秒", parallelSteps.size(), timeout);
        
        // 創建並行任務
        List<CompletableFuture<StepExecutionResult>> futures = new ArrayList<>();
        
        for (StepDefinition step : parallelSteps) {
            CompletableFuture<StepExecutionResult> future = CompletableFuture.supplyAsync(
                () -> executeStep(step, context.createChild()),
                executorService
            );
            futures.add(future);
        }
        
        // 等待所有任務完成
        try {
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
            );
            
            allFutures.get(timeout, TimeUnit.SECONDS);
            
            // 收集結果
            List<StepExecutionResult> results = new ArrayList<>();
            for (CompletableFuture<StepExecutionResult> future : futures) {
                results.add(future.get());
            }
            
            logger.debug("並行執行完成,成功: {}, 失敗: {}", 
                countSuccessful(results), countFailed(results));
            
            return new ParallelExecutionResult(results);
            
        } catch (TimeoutException e) {
            // 取消未完成的任務
            for (CompletableFuture<StepExecutionResult> future : futures) {
                future.cancel(true);
            }
            throw new RuntimeException("並行執行超時: " + timeout + "秒", e);
        }
    }
    
    /**
     * 執行單個步驟
     */
    private StepExecutionResult executeStep(StepDefinition stepDefinition, FlowContext context) {
        try {
            StepExecutor executor = flowEngine.getExecutor(stepDefinition.getType());
            if (executor == null) {
                throw new IllegalStateException("未找到步驟類型的執行器: " + stepDefinition.getType());
            }
            
            return executor.execute(stepDefinition, context);
            
        } catch (Exception e) {
            logger.error("並行步驟執行異常: {}", stepDefinition.getId(), e);
            
            return new StepExecutionResult(
                stepDefinition.getId(),
                stepDefinition.getType(),
                FlowExecutionResult.Status.FAILED,
                null,
                System.currentTimeMillis(),
                System.currentTimeMillis(),
                e
            );
        }
    }
    
    /**
     * 獲取並行執行的步驟
     */
    @SuppressWarnings("unchecked")
    private List<StepDefinition> getParallelSteps(StepDefinition stepDefinition) {
        return (List<StepDefinition>) stepDefinition.getConfig().get("steps");
    }
    
    /**
     * 統計成功的步驟數量
     */
    private long countSuccessful(List<StepExecutionResult> results) {
        return results.stream()
            .filter(r -> r.getStatus() == FlowExecutionResult.Status.SUCCESS)
            .count();
    }
    
    /**
     * 統計失敗的步驟數量
     */
    private long countFailed(List<StepExecutionResult> results) {
        return results.stream()
            .filter(r -> r.getStatus() == FlowExecutionResult.Status.FAILED)
            .count();
    }
    
    @Override
    public StepType getSupportedType() {
        return StepType.PARALLEL;
    }
    
    @Override
    public ValidationResult validate(StepDefinition stepDefinition) {
        List<StepDefinition> steps = getParallelSteps(stepDefinition);
        if (steps == null || steps.isEmpty()) {
            return ValidationResult.error("並行步驟必須配置至少一個子步驟");
        }
        
        // 驗證超時配置
        Integer timeoutSeconds = getConfig(stepDefinition, "timeoutSeconds", Integer.class);
        if (timeoutSeconds != null && timeoutSeconds <= 0) {
            return ValidationResult.error("超時時間必須大於0");
        }
        
        return ValidationResult.success();
    }
    
    /**
     * 關閉執行器
     */
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * 並行執行結果
     */
    public static class ParallelExecutionResult {
        private final List<StepExecutionResult> results;
        
        public ParallelExecutionResult(List<StepExecutionResult> results) {
            this.results = results;
        }
        
        public List<StepExecutionResult> getResults() {
            return results;
        }
        
        public boolean hasFailures() {
            return results.stream()
                .anyMatch(r -> r.getStatus() == FlowExecutionResult.Status.FAILED);
        }
        
        public List<StepExecutionResult> getSuccessfulResults() {
            return results.stream()
                .filter(r -> r.getStatus() == FlowExecutionResult.Status.SUCCESS)
                .collect(Collectors.toList());
        }
        
        public List<StepExecutionResult> getFailedResults() {
            return results.stream()
                .filter(r -> r.getStatus() == FlowExecutionResult.Status.FAILED)
                .collect(Collectors.toList());
        }
    }
}

🔄 LoopStepExecutor - 循環執行器

/**
 * 循環步驟執行器
 * 重複執行步驟直到滿足退出條件
 */
public class LoopStepExecutor extends AbstractStepExecutor {
    
    private final ExpressionEvaluator expressionEvaluator;
    private final FlowEngine flowEngine;
    
    public LoopStepExecutor(ExpressionEvaluator expressionEvaluator, FlowEngine flowEngine) {
        this.expressionEvaluator = expressionEvaluator;
        this.flowEngine = flowEngine;
    }
    
    @Override
    protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
        // 獲取循環配置
        String condition = getConfig(stepDefinition, "condition", String.class);
        Integer maxIterations = getConfig(stepDefinition, "maxIterations", Integer.class);
        List<StepDefinition> loopSteps = getLoopSteps(stepDefinition);
        
        if (loopSteps == null || loopSteps.isEmpty()) {
            logger.warn("循環步驟沒有配置子步驟");
            return Collections.emptyList();
        }
        
        // 設置默認值
        if (maxIterations == null) {
            maxIterations = 100; // 默認最大100次迭代
        }
        
        logger.debug("開始循環執行,最大迭代次數: {}, 條件: {}", maxIterations, condition);
        
        List<LoopIterationResult> iterationResults = new ArrayList<>();
        int iteration = 0;
        
        while (iteration < maxIterations) {
            iteration++;
            
            logger.debug("開始第 {} 次迭代", iteration);
            
            // 設置迭代變量
            context.put("_iteration", iteration);
            context.put("_isFirstIteration", iteration == 1);
            
            // 執行循環體
            List<StepExecutionResult> stepResults = new ArrayList<>();
            boolean shouldBreak = false;
            
            for (StepDefinition step : loopSteps) {
                StepExecutor executor = flowEngine.getExecutor(step.getType());
                if (executor == null) {
                    throw new IllegalStateException("未找到步驟類型的執行器: " + step.getType());
                }
                
                StepExecutionResult result = executor.execute(step, context);
                stepResults.add(result);
                
                // 如果步驟失敗,根據配置決定是否繼續
                if (result.getStatus() == FlowExecutionResult.Status.FAILED) {
                    if (!shouldContinueOnFailure(step)) {
                        shouldBreak = true;
                        break;
                    }
                }
            }
            
            iterationResults.add(new LoopIterationResult(iteration, stepResults));
            
            // 如果步驟失敗需要退出
            if (shouldBreak) {
                logger.debug("由於步驟失敗,退出循環");
                break;
            }
            
            // 檢查退出條件
            if (condition != null && !condition.trim().isEmpty()) {
                try {
                    boolean shouldContinue = expressionEvaluator.evaluateBoolean(condition, context);
                    if (!shouldContinue) {
                        logger.debug("滿足退出條件,結束循環");
                        break;
                    }
                } catch (Exception e) {
                    logger.warn("循環條件評估失敗: {}", condition, e);
                    break;
                }
            }
        }
        
        // 清理迭代變量
        context.remove("_iteration");
        context.remove("_isFirstIteration");
        
        logger.debug("循環執行完成,總迭代次數: {}", iteration);
        
        return new LoopExecutionResult(iteration, iterationResults);
    }
    
    /**
     * 獲取循環執行的步驟
     */
    @SuppressWarnings("unchecked")
    private List<StepDefinition> getLoopSteps(StepDefinition stepDefinition) {
        return (List<StepDefinition>) stepDefinition.getConfig().get("steps");
    }
    
    /**
     * 判斷失敗時是否繼續執行
     */
    private boolean shouldContinueOnFailure(StepDefinition stepDefinition) {
        Boolean continueOnFailure = getConfig(stepDefinition, "continueOnFailure", Boolean.class);
        return Boolean.TRUE.equals(continueOnFailure);
    }
    
    @Override
    public StepType getSupportedType() {
        return StepType.LOOP;
    }
    
    @Override
    public ValidationResult validate(StepDefinition stepDefinition) {
        List<StepDefinition> steps = getLoopSteps(stepDefinition);
        if (steps == null || steps.isEmpty()) {
            return ValidationResult.error("循環步驟必須配置至少一個子步驟");
        }
        
        // 驗證最大迭代次數
        Integer maxIterations = getConfig(stepDefinition, "maxIterations", Integer.class);
        if (maxIterations != null && maxIterations <= 0) {
            return ValidationResult.error("最大迭代次數必須大於0");
        }
        
        return ValidationResult.success();
    }
    
    /**
     * 循環迭代結果
     */
    public static class LoopIterationResult {
        private final int iteration;
        private final List<StepExecutionResult> stepResults;
        
        public LoopIterationResult(int iteration, List<StepExecutionResult> stepResults) {
            this.iteration = iteration;
            this.stepResults = stepResults;
        }
        
        // getter方法...
    }
    
    /**
     * 循環執行結果
     */
    public static class LoopExecutionResult {
        private final int totalIterations;
        private final List<LoopIterationResult> iterationResults;
        
        public LoopExecutionResult(int totalIterations, List<LoopIterationResult> iterationResults) {
            this.totalIterations = totalIterations;
            this.iterationResults = iterationResults;
        }
        
        // getter方法...
    }
}

📝 ScriptStepExecutor - 腳本執行器

/**
 * 腳本步驟執行器
 * 執行動態腳本代碼
 */
public class ScriptStepExecutor extends AbstractStepExecutor {
    
    private final Map<String, ScriptEngine> scriptEngines = new ConcurrentHashMap<>();
    
    public ScriptStepExecutor() {
        initializeScriptEngines();
    }
    
    @Override
    protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
        // 獲取腳本配置
        String scriptType = getRequiredConfig(stepDefinition, "scriptType", String.class);
        String script = getRequiredConfig(stepDefinition, "script", String.class);
        
        logger.debug("執行腳本: 類型={}, 長度={}", scriptType, script.length());
        
        // 獲取腳本引擎
        ScriptEngine engine = getScriptEngine(scriptType);
        if (engine == null) {
            throw new IllegalArgumentException("不支持的腳本類型: " + scriptType);
        }
        
        // 設置腳本上下文
        setupScriptContext(engine, context);
        
        // 執行腳本
        Object result = engine.eval(script);
        
        logger.debug("腳本執行完成,結果類型: {}", 
            result != null ? result.getClass().getSimpleName() : "null");
        
        return result;
    }
    
    /**
     * 初始化腳本引擎
     */
    private void initializeScriptEngines() {
        ScriptEngineManager manager = new ScriptEngineManager();
        
        // JavaScript引擎
        ScriptEngine jsEngine = manager.getEngineByName("javascript");
        if (jsEngine != null) {
            scriptEngines.put("javascript", jsEngine);
            scriptEngines.put("js", jsEngine);
        }
        
        // Groovy引擎(如果可用)
        ScriptEngine groovyEngine = manager.getEngineByName("groovy");
        if (groovyEngine != null) {
            scriptEngines.put("groovy", groovyEngine);
        }
        
        // Kotlin引擎(如果可用)
        ScriptEngine kotlinEngine = manager.getEngineByName("kotlin");
        if (kotlinEngine != null) {
            scriptEngines.put("kotlin", kotlinEngine);
            scriptEngines.put("kts", kotlinEngine);
        }
        
        logger.info("已初始化腳本引擎: {}", scriptEngines.keySet());
    }
    
    /**
     * 獲取腳本引擎
     */
    private ScriptEngine getScriptEngine(String scriptType) {
        return scriptEngines.get(scriptType.toLowerCase());
    }
    
    /**
     * 設置腳本上下文
     */
    private void setupScriptContext(ScriptEngine engine, FlowContext context) {
        Bindings bindings = engine.createBindings();
        
        // 添加上下文數據
        bindings.put("context", context);
        
        // 添加所有上下文變量
        for (Map.Entry<String, Object> entry : context.getAll().entrySet()) {
            bindings.put(entry.getKey(), entry.getValue());
        }
        
        // 添加工具函數
        bindings.put("logger", logger);
        bindings.put("System", System.class);
        bindings.put("Math", Math.class);
        
        engine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
    }
    
    @Override
    public StepType getSupportedType() {
        return StepType.SCRIPT;
    }
    
    @Override
    public ValidationResult validate(StepDefinition stepDefinition) {
        String scriptType = getConfig(stepDefinition, "scriptType", String.class);
        if (scriptType == null || scriptType.trim().isEmpty()) {
            return ValidationResult.error("腳本步驟必須指定scriptType配置");
        }
        
        String script = getConfig(stepDefinition, "script", String.class);
        if (script == null || script.trim().isEmpty()) {
            return ValidationResult.error("腳本步驟必須指定script配置");
        }
        
        // 檢查腳本引擎是否可用
        if (getScriptEngine(scriptType) == null) {
            return ValidationResult.error("不支持的腳本類型: " + scriptType + 
                ", 可用類型: " + scriptEngines.keySet());
        }
        
        return ValidationResult.success();
    }
}

🧪 執行器測試示例

public class StepExecutorTest {
    
    private FlowEngine flowEngine;
    private FlowContext context;
    
    @Before
    public void setUp() {
        flowEngine = new DefaultFlowEngine();
        context = new DefaultFlowContext();
        
        // 註冊執行器
        flowEngine.registerExecutor(StepType.CONDITIONAL, 
            new ConditionalStepExecutor(new SimpleExpressionEvaluator(), flowEngine));
        flowEngine.registerExecutor(StepType.PARALLEL, 
            new ParallelStepExecutor(flowEngine));
        flowEngine.registerExecutor(StepType.LOOP, 
            new LoopStepExecutor(new SimpleExpressionEvaluator(), flowEngine));
        flowEngine.registerExecutor(StepType.SCRIPT, 
            new ScriptStepExecutor());
    }
    
    @Test
    public void testConditionalExecutor() {
        // 創建條件步驟
        StepDefinition conditionalStep = StepDefinitionBuilder
            .builder("age-check", StepType.CONDITIONAL)
            .config("condition", "context.get('age') >= 18")
            .config("trueSteps", Arrays.asList(
                StepDefinitionBuilder
                    .builder("adult-process", StepType.SIMPLE)
                    .config("action", "set")
                    .config("value", "成年人處理")
                    .config("targetKey", "result")
                    .build()
            ))
            .config("falseSteps", Arrays.asList(
                StepDefinitionBuilder
                    .builder("minor-process", StepType.SIMPLE)
                    .config("action", "set")
                    .config("value", "未成年人處理")
                    .config("targetKey", "result")
                    .build()
            ))
            .build();
        
        // 測試成年人情況
        context.put("age", 25);
        StepExecutor executor = flowEngine.getExecutor(StepType.CONDITIONAL);
        StepExecutionResult result = executor.execute(conditionalStep, context);
        
        assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
        assertEquals("成年人處理", context.get("result"));
        
        // 測試未成年人情況
        context.clear();
        context.put("age", 16);
        result = executor.execute(conditionalStep, context);
        
        assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
        assertEquals("未成年人處理", context.get("result"));
    }
    
    @Test
    public void testParallelExecutor() {
        // 創建並行步驟
        StepDefinition parallelStep = StepDefinitionBuilder
            .builder("parallel-tasks", StepType.PARALLEL)
            .config("timeoutSeconds", 10)
            .config("steps", Arrays.asList(
                StepDefinitionBuilder
                    .builder("task1", StepType.SIMPLE)
                    .config("action", "sleep")
                    .config("value", 100)
                    .build(),
                StepDefinitionBuilder
                    .builder("task2", StepType.SIMPLE)
                    .config("action", "sleep")
                    .config("value", 200)
                    .build()
            ))
            .build();
        
        long startTime = System.currentTimeMillis();
        
        StepExecutor executor = flowEngine.getExecutor(StepType.PARALLEL);
        StepExecutionResult result = executor.execute(parallelStep, context);
        
        long endTime = System.currentTimeMillis();
        
        assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
        assertTrue("並行執行應該比串行執行快", endTime - startTime < 250);
        
        ParallelStepExecutor.ParallelExecutionResult parallelResult = 
            (ParallelStepExecutor.ParallelExecutionResult) result.getResult();
        assertEquals(2, parallelResult.getResults().size());
        assertFalse(parallelResult.hasFailures());
    }
    
    @Test
    public void testLoopExecutor() {
        // 創建循環步驟
        StepDefinition loopStep = StepDefinitionBuilder
            .builder("counter-loop", StepType.LOOP)
            .config("condition", "context.get('counter', 0) < 5")
            .config("maxIterations", 10)
            .config("steps", Arrays.asList(
                StepDefinitionBuilder
                    .builder("increment", StepType.SIMPLE)
                    .config("action", "increment")
                    .config("value", "counter")
                    .config("targetKey", "counter")
                    .build()
            ))
            .build();
        
        context.put("counter", 0);
        
        StepExecutor executor = flowEngine.getExecutor(StepType.LOOP);
        StepExecutionResult result = executor.execute(loopStep, context);
        
        assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
        assertEquals(5, (int) context.get("counter"));
        
        LoopStepExecutor.LoopExecutionResult loopResult = 
            (LoopStepExecutor.LoopExecutionResult) result.getResult();
        assertEquals(5, loopResult.getTotalIterations());
    }
    
    @Test
    public void testScriptExecutor() {
        // 創建腳本步驟
        StepDefinition scriptStep = StepDefinitionBuilder
            .builder("calculate", StepType.SCRIPT)
            .config("scriptType", "javascript")
            .config("script", """
                var a = context.get('a');
                var b = context.get('b');
                var result = a + b;
                context.put('sum', result);
                result;
            """)
            .build();
        
        context.put("a", 10);
        context.put("b", 20);
        
        StepExecutor executor = flowEngine.getExecutor(StepType.SCRIPT);
        StepExecutionResult result = executor.execute(scriptStep, context);
        
        assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
        assertEquals(30, result.getResult());
        assertEquals(30, (int) context.get("sum"));
    }
}

🎯 設計亮點

✨ 1. 模板方法模式

  • 🏗️ 統一的執行流程: AbstractStepExecutor定義了標準的執行模板
  • 🔧 靈活的擴展點: 子類只需實現核心邏輯
  • 🛡️ 統一的異常處理: 在基類中處理通用的異常情況

✨ 2. 策略模式

  • 🎯 可插拔的執行器: 不同類型的步驟使用不同的執行策略
  • 🔄 運行時選擇: 根據步驟類型動態選擇執行器
  • 🚀 易於擴展: 添加新的步驟類型只需實現新的執行器

✨ 3. 組合模式

  • 🌳 遞歸執行: 複雜步驟可以包含子步驟
  • 🔀 統一接口: 簡單步驟和複合步驟使用相同的接口
  • 📊 層次結構: 支持任意深度的步驟嵌套

✨ 4. 併發安全

  • 🧵 線程安全: 使用線程安全的數據結構
  • 🔒 上下文隔離: 並行執行時創建獨立的子上下文
  • 異步支持: 支持異步和並行執行

🎉 小結

在這一章中,我們完成了:

  • 🏗️ 設計了執行器架構: 基於AbstractStepExecutor的可擴展架構
  • 🔀 實現了條件執行器: 支持基於表達式的條件分支
  • 實現了並行執行器: 支持多任務並行執行
  • 🔄 實現了循環執行器: 支持條件循環和迭代控制
  • 📝 實現了腳本執行器: 支持多種腳本語言的動態執行
  • 🧪 提供了完整測試: 驗證各種執行器的功能

🚀 下一步

在下一章《上下文管理機制》中,我們將:

  • 📦 深入設計FlowContext的實現
  • 🔒 實現線程安全的上下文管理
  • 🌳 設計層次化的上下文結構
  • 🔄 實現上下文的序列化和持久化
  • 🧪 編寫上下文管理的完整測試

💡 思考題:

  1. 如何設計執行器的優先級和選擇策略?
  2. 如何實現執行器的熱插拔和動態加載?
  3. 如何設計才能支持執行器的配置和參數化?
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.