🎯 目標: 設計可擴展的步驟執行器架構,實現各種類型的步驟執行邏輯
🤔 為什麼需要多種執行器?
不同的業務場景需要不同的執行邏輯:
- 🔀 條件分支: 根據條件選擇不同的執行路徑
- ⚡ 並行執行: 同時執行多個獨立的任務
- 🔄 循環執行: 重複執行某些步驟直到滿足條件
- 📝 腳本執行: 動態執行腳本代碼
- 🌐 遠程調用: 調用外部服務或API
🏗️ 執行器架構設計
🎯 AbstractStepExecutor基類
/**
* 步驟執行器抽象基類
* 提供通用的執行邏輯和模板方法
*/
public abstract class AbstractStepExecutor implements StepExecutor {
protected static final Logger logger = LoggerFactory.getLogger(AbstractStepExecutor.class);
@Override
public final StepExecutionResult execute(StepDefinition stepDefinition, FlowContext context) {
long startTime = System.currentTimeMillis();
String stepId = stepDefinition.getId();
logger.debug("開始執行步驟: {} [{}]", stepDefinition.getName(), stepId);
try {
// 前置處理
preExecute(stepDefinition, context);
// 執行核心邏輯
Object result = doExecute(stepDefinition, context);
// 後置處理
postExecute(stepDefinition, context, result);
long endTime = System.currentTimeMillis();
logger.debug("步驟執行成功: {} [{}], 耗時: {}ms",
stepDefinition.getName(), stepId, endTime - startTime);
return createSuccessResult(stepDefinition, result, startTime, endTime);
} catch (Exception e) {
long endTime = System.currentTimeMillis();
logger.error("步驟執行失敗: {} [{}], 耗時: {}ms",
stepDefinition.getName(), stepId, endTime - startTime, e);
return createFailureResult(stepDefinition, e, startTime, endTime);
}
}
/**
* 執行前置處理
* @param stepDefinition 步驟定義
* @param context 執行上下文
*/
protected void preExecute(StepDefinition stepDefinition, FlowContext context) {
// 默認空實現,子類可以重寫
}
/**
* 執行核心邏輯(子類必須實現)
* @param stepDefinition 步驟定義
* @param context 執行上下文
* @return 執行結果
*/
protected abstract Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception;
/**
* 執行後置處理
* @param stepDefinition 步驟定義
* @param context 執行上下文
* @param result 執行結果
*/
protected void postExecute(StepDefinition stepDefinition, FlowContext context, Object result) {
// 保存結果到上下文
String resultKey = getResultKey(stepDefinition);
if (resultKey != null && result != null) {
context.put(resultKey, result);
}
}
/**
* 獲取結果保存的鍵名
*/
protected String getResultKey(StepDefinition stepDefinition) {
return (String) stepDefinition.getConfig().get("resultKey");
}
/**
* 創建成功結果
*/
protected StepExecutionResult createSuccessResult(
StepDefinition stepDefinition, Object result, long startTime, long endTime) {
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.SUCCESS,
result,
startTime,
endTime,
null
);
}
/**
* 創建失敗結果
*/
protected StepExecutionResult createFailureResult(
StepDefinition stepDefinition, Exception exception, long startTime, long endTime) {
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.FAILED,
null,
startTime,
endTime,
exception
);
}
/**
* 獲取配置值
*/
protected <T> T getConfig(StepDefinition stepDefinition, String key, Class<T> type) {
Object value = stepDefinition.getConfig().get(key);
if (value == null) {
return null;
}
if (type.isInstance(value)) {
return type.cast(value);
}
throw new IllegalArgumentException(
String.format("配置 %s 的類型不匹配,期望: %s, 實際: %s",
key, type.getSimpleName(), value.getClass().getSimpleName())
);
}
/**
* 獲取必需的配置值
*/
protected <T> T getRequiredConfig(StepDefinition stepDefinition, String key, Class<T> type) {
T value = getConfig(stepDefinition, key, type);
if (value == null) {
throw new IllegalArgumentException(
String.format("步驟 %s 缺少必需的配置: %s", stepDefinition.getId(), key)
);
}
return value;
}
}
🔀 ConditionalStepExecutor - 條件執行器
/**
* 條件步驟執行器
* 根據條件表達式選擇執行不同的分支
*/
public class ConditionalStepExecutor extends AbstractStepExecutor {
private final ExpressionEvaluator expressionEvaluator;
private final FlowEngine flowEngine;
public ConditionalStepExecutor(ExpressionEvaluator expressionEvaluator, FlowEngine flowEngine) {
this.expressionEvaluator = expressionEvaluator;
this.flowEngine = flowEngine;
}
@Override
protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
// 獲取條件表達式
String condition = getRequiredConfig(stepDefinition, "condition", String.class);
// 評估條件
boolean conditionResult = expressionEvaluator.evaluateBoolean(condition, context);
logger.debug("條件評估結果: {} -> {}", condition, conditionResult);
// 根據條件選擇執行分支
List<StepDefinition> targetSteps = conditionResult ?
getTrueSteps(stepDefinition) : getFalseSteps(stepDefinition);
if (targetSteps == null || targetSteps.isEmpty()) {
logger.debug("沒有找到匹配的執行分支");
return conditionResult;
}
// 執行選中的步驟
List<StepExecutionResult> branchResults = new ArrayList<>();
for (StepDefinition step : targetSteps) {
StepExecutor executor = flowEngine.getExecutor(step.getType());
if (executor == null) {
throw new IllegalStateException("未找到步驟類型的執行器: " + step.getType());
}
StepExecutionResult result = executor.execute(step, context);
branchResults.add(result);
// 如果步驟失敗,根據配置決定是否繼續
if (result.getStatus() == FlowExecutionResult.Status.FAILED &&
!shouldContinueOnFailure(step)) {
break;
}
}
return new ConditionalExecutionResult(conditionResult, branchResults);
}
/**
* 獲取條件為真時執行的步驟
*/
@SuppressWarnings("unchecked")
private List<StepDefinition> getTrueSteps(StepDefinition stepDefinition) {
return (List<StepDefinition>) stepDefinition.getConfig().get("trueSteps");
}
/**
* 獲取條件為假時執行的步驟
*/
@SuppressWarnings("unchecked")
private List<StepDefinition> getFalseSteps(StepDefinition stepDefinition) {
return (List<StepDefinition>) stepDefinition.getConfig().get("falseSteps");
}
/**
* 判斷失敗時是否繼續執行
*/
private boolean shouldContinueOnFailure(StepDefinition stepDefinition) {
Boolean continueOnFailure = getConfig(stepDefinition, "continueOnFailure", Boolean.class);
return Boolean.TRUE.equals(continueOnFailure);
}
@Override
public StepType getSupportedType() {
return StepType.CONDITIONAL;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
// 驗證條件表達式
String condition = getConfig(stepDefinition, "condition", String.class);
if (condition == null || condition.trim().isEmpty()) {
return ValidationResult.error("條件步驟必須指定condition配置");
}
// 驗證至少有一個分支
List<StepDefinition> trueSteps = getTrueSteps(stepDefinition);
List<StepDefinition> falseSteps = getFalseSteps(stepDefinition);
if ((trueSteps == null || trueSteps.isEmpty()) &&
(falseSteps == null || falseSteps.isEmpty())) {
return ValidationResult.error("條件步驟必須至少指定一個執行分支");
}
return ValidationResult.success();
}
/**
* 條件執行結果
*/
public static class ConditionalExecutionResult {
private final boolean conditionResult;
private final List<StepExecutionResult> branchResults;
public ConditionalExecutionResult(boolean conditionResult, List<StepExecutionResult> branchResults) {
this.conditionResult = conditionResult;
this.branchResults = branchResults;
}
// getter方法...
}
}
⚡ ParallelStepExecutor - 並行執行器
/**
* 並行步驟執行器
* 同時執行多個獨立的步驟
*/
public class ParallelStepExecutor extends AbstractStepExecutor {
private final FlowEngine flowEngine;
private final ExecutorService executorService;
public ParallelStepExecutor(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
this.executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("parallel-step-%d")
.setDaemon(true)
.build()
);
}
@Override
protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
// 獲取並行執行的步驟
List<StepDefinition> parallelSteps = getParallelSteps(stepDefinition);
if (parallelSteps == null || parallelSteps.isEmpty()) {
logger.warn("並行步驟沒有配置子步驟");
return Collections.emptyList();
}
// 獲取超時配置
Integer timeoutSeconds = getConfig(stepDefinition, "timeoutSeconds", Integer.class);
long timeout = timeoutSeconds != null ? timeoutSeconds : 300; // 默認5分鐘
logger.debug("開始並行執行 {} 個步驟,超時: {}秒", parallelSteps.size(), timeout);
// 創建並行任務
List<CompletableFuture<StepExecutionResult>> futures = new ArrayList<>();
for (StepDefinition step : parallelSteps) {
CompletableFuture<StepExecutionResult> future = CompletableFuture.supplyAsync(
() -> executeStep(step, context.createChild()),
executorService
);
futures.add(future);
}
// 等待所有任務完成
try {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(timeout, TimeUnit.SECONDS);
// 收集結果
List<StepExecutionResult> results = new ArrayList<>();
for (CompletableFuture<StepExecutionResult> future : futures) {
results.add(future.get());
}
logger.debug("並行執行完成,成功: {}, 失敗: {}",
countSuccessful(results), countFailed(results));
return new ParallelExecutionResult(results);
} catch (TimeoutException e) {
// 取消未完成的任務
for (CompletableFuture<StepExecutionResult> future : futures) {
future.cancel(true);
}
throw new RuntimeException("並行執行超時: " + timeout + "秒", e);
}
}
/**
* 執行單個步驟
*/
private StepExecutionResult executeStep(StepDefinition stepDefinition, FlowContext context) {
try {
StepExecutor executor = flowEngine.getExecutor(stepDefinition.getType());
if (executor == null) {
throw new IllegalStateException("未找到步驟類型的執行器: " + stepDefinition.getType());
}
return executor.execute(stepDefinition, context);
} catch (Exception e) {
logger.error("並行步驟執行異常: {}", stepDefinition.getId(), e);
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.FAILED,
null,
System.currentTimeMillis(),
System.currentTimeMillis(),
e
);
}
}
/**
* 獲取並行執行的步驟
*/
@SuppressWarnings("unchecked")
private List<StepDefinition> getParallelSteps(StepDefinition stepDefinition) {
return (List<StepDefinition>) stepDefinition.getConfig().get("steps");
}
/**
* 統計成功的步驟數量
*/
private long countSuccessful(List<StepExecutionResult> results) {
return results.stream()
.filter(r -> r.getStatus() == FlowExecutionResult.Status.SUCCESS)
.count();
}
/**
* 統計失敗的步驟數量
*/
private long countFailed(List<StepExecutionResult> results) {
return results.stream()
.filter(r -> r.getStatus() == FlowExecutionResult.Status.FAILED)
.count();
}
@Override
public StepType getSupportedType() {
return StepType.PARALLEL;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
List<StepDefinition> steps = getParallelSteps(stepDefinition);
if (steps == null || steps.isEmpty()) {
return ValidationResult.error("並行步驟必須配置至少一個子步驟");
}
// 驗證超時配置
Integer timeoutSeconds = getConfig(stepDefinition, "timeoutSeconds", Integer.class);
if (timeoutSeconds != null && timeoutSeconds <= 0) {
return ValidationResult.error("超時時間必須大於0");
}
return ValidationResult.success();
}
/**
* 關閉執行器
*/
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 並行執行結果
*/
public static class ParallelExecutionResult {
private final List<StepExecutionResult> results;
public ParallelExecutionResult(List<StepExecutionResult> results) {
this.results = results;
}
public List<StepExecutionResult> getResults() {
return results;
}
public boolean hasFailures() {
return results.stream()
.anyMatch(r -> r.getStatus() == FlowExecutionResult.Status.FAILED);
}
public List<StepExecutionResult> getSuccessfulResults() {
return results.stream()
.filter(r -> r.getStatus() == FlowExecutionResult.Status.SUCCESS)
.collect(Collectors.toList());
}
public List<StepExecutionResult> getFailedResults() {
return results.stream()
.filter(r -> r.getStatus() == FlowExecutionResult.Status.FAILED)
.collect(Collectors.toList());
}
}
}
🔄 LoopStepExecutor - 循環執行器
/**
* 循環步驟執行器
* 重複執行步驟直到滿足退出條件
*/
public class LoopStepExecutor extends AbstractStepExecutor {
private final ExpressionEvaluator expressionEvaluator;
private final FlowEngine flowEngine;
public LoopStepExecutor(ExpressionEvaluator expressionEvaluator, FlowEngine flowEngine) {
this.expressionEvaluator = expressionEvaluator;
this.flowEngine = flowEngine;
}
@Override
protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
// 獲取循環配置
String condition = getConfig(stepDefinition, "condition", String.class);
Integer maxIterations = getConfig(stepDefinition, "maxIterations", Integer.class);
List<StepDefinition> loopSteps = getLoopSteps(stepDefinition);
if (loopSteps == null || loopSteps.isEmpty()) {
logger.warn("循環步驟沒有配置子步驟");
return Collections.emptyList();
}
// 設置默認值
if (maxIterations == null) {
maxIterations = 100; // 默認最大100次迭代
}
logger.debug("開始循環執行,最大迭代次數: {}, 條件: {}", maxIterations, condition);
List<LoopIterationResult> iterationResults = new ArrayList<>();
int iteration = 0;
while (iteration < maxIterations) {
iteration++;
logger.debug("開始第 {} 次迭代", iteration);
// 設置迭代變量
context.put("_iteration", iteration);
context.put("_isFirstIteration", iteration == 1);
// 執行循環體
List<StepExecutionResult> stepResults = new ArrayList<>();
boolean shouldBreak = false;
for (StepDefinition step : loopSteps) {
StepExecutor executor = flowEngine.getExecutor(step.getType());
if (executor == null) {
throw new IllegalStateException("未找到步驟類型的執行器: " + step.getType());
}
StepExecutionResult result = executor.execute(step, context);
stepResults.add(result);
// 如果步驟失敗,根據配置決定是否繼續
if (result.getStatus() == FlowExecutionResult.Status.FAILED) {
if (!shouldContinueOnFailure(step)) {
shouldBreak = true;
break;
}
}
}
iterationResults.add(new LoopIterationResult(iteration, stepResults));
// 如果步驟失敗需要退出
if (shouldBreak) {
logger.debug("由於步驟失敗,退出循環");
break;
}
// 檢查退出條件
if (condition != null && !condition.trim().isEmpty()) {
try {
boolean shouldContinue = expressionEvaluator.evaluateBoolean(condition, context);
if (!shouldContinue) {
logger.debug("滿足退出條件,結束循環");
break;
}
} catch (Exception e) {
logger.warn("循環條件評估失敗: {}", condition, e);
break;
}
}
}
// 清理迭代變量
context.remove("_iteration");
context.remove("_isFirstIteration");
logger.debug("循環執行完成,總迭代次數: {}", iteration);
return new LoopExecutionResult(iteration, iterationResults);
}
/**
* 獲取循環執行的步驟
*/
@SuppressWarnings("unchecked")
private List<StepDefinition> getLoopSteps(StepDefinition stepDefinition) {
return (List<StepDefinition>) stepDefinition.getConfig().get("steps");
}
/**
* 判斷失敗時是否繼續執行
*/
private boolean shouldContinueOnFailure(StepDefinition stepDefinition) {
Boolean continueOnFailure = getConfig(stepDefinition, "continueOnFailure", Boolean.class);
return Boolean.TRUE.equals(continueOnFailure);
}
@Override
public StepType getSupportedType() {
return StepType.LOOP;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
List<StepDefinition> steps = getLoopSteps(stepDefinition);
if (steps == null || steps.isEmpty()) {
return ValidationResult.error("循環步驟必須配置至少一個子步驟");
}
// 驗證最大迭代次數
Integer maxIterations = getConfig(stepDefinition, "maxIterations", Integer.class);
if (maxIterations != null && maxIterations <= 0) {
return ValidationResult.error("最大迭代次數必須大於0");
}
return ValidationResult.success();
}
/**
* 循環迭代結果
*/
public static class LoopIterationResult {
private final int iteration;
private final List<StepExecutionResult> stepResults;
public LoopIterationResult(int iteration, List<StepExecutionResult> stepResults) {
this.iteration = iteration;
this.stepResults = stepResults;
}
// getter方法...
}
/**
* 循環執行結果
*/
public static class LoopExecutionResult {
private final int totalIterations;
private final List<LoopIterationResult> iterationResults;
public LoopExecutionResult(int totalIterations, List<LoopIterationResult> iterationResults) {
this.totalIterations = totalIterations;
this.iterationResults = iterationResults;
}
// getter方法...
}
}
📝 ScriptStepExecutor - 腳本執行器
/**
* 腳本步驟執行器
* 執行動態腳本代碼
*/
public class ScriptStepExecutor extends AbstractStepExecutor {
private final Map<String, ScriptEngine> scriptEngines = new ConcurrentHashMap<>();
public ScriptStepExecutor() {
initializeScriptEngines();
}
@Override
protected Object doExecute(StepDefinition stepDefinition, FlowContext context) throws Exception {
// 獲取腳本配置
String scriptType = getRequiredConfig(stepDefinition, "scriptType", String.class);
String script = getRequiredConfig(stepDefinition, "script", String.class);
logger.debug("執行腳本: 類型={}, 長度={}", scriptType, script.length());
// 獲取腳本引擎
ScriptEngine engine = getScriptEngine(scriptType);
if (engine == null) {
throw new IllegalArgumentException("不支持的腳本類型: " + scriptType);
}
// 設置腳本上下文
setupScriptContext(engine, context);
// 執行腳本
Object result = engine.eval(script);
logger.debug("腳本執行完成,結果類型: {}",
result != null ? result.getClass().getSimpleName() : "null");
return result;
}
/**
* 初始化腳本引擎
*/
private void initializeScriptEngines() {
ScriptEngineManager manager = new ScriptEngineManager();
// JavaScript引擎
ScriptEngine jsEngine = manager.getEngineByName("javascript");
if (jsEngine != null) {
scriptEngines.put("javascript", jsEngine);
scriptEngines.put("js", jsEngine);
}
// Groovy引擎(如果可用)
ScriptEngine groovyEngine = manager.getEngineByName("groovy");
if (groovyEngine != null) {
scriptEngines.put("groovy", groovyEngine);
}
// Kotlin引擎(如果可用)
ScriptEngine kotlinEngine = manager.getEngineByName("kotlin");
if (kotlinEngine != null) {
scriptEngines.put("kotlin", kotlinEngine);
scriptEngines.put("kts", kotlinEngine);
}
logger.info("已初始化腳本引擎: {}", scriptEngines.keySet());
}
/**
* 獲取腳本引擎
*/
private ScriptEngine getScriptEngine(String scriptType) {
return scriptEngines.get(scriptType.toLowerCase());
}
/**
* 設置腳本上下文
*/
private void setupScriptContext(ScriptEngine engine, FlowContext context) {
Bindings bindings = engine.createBindings();
// 添加上下文數據
bindings.put("context", context);
// 添加所有上下文變量
for (Map.Entry<String, Object> entry : context.getAll().entrySet()) {
bindings.put(entry.getKey(), entry.getValue());
}
// 添加工具函數
bindings.put("logger", logger);
bindings.put("System", System.class);
bindings.put("Math", Math.class);
engine.setBindings(bindings, ScriptContext.ENGINE_SCOPE);
}
@Override
public StepType getSupportedType() {
return StepType.SCRIPT;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
String scriptType = getConfig(stepDefinition, "scriptType", String.class);
if (scriptType == null || scriptType.trim().isEmpty()) {
return ValidationResult.error("腳本步驟必須指定scriptType配置");
}
String script = getConfig(stepDefinition, "script", String.class);
if (script == null || script.trim().isEmpty()) {
return ValidationResult.error("腳本步驟必須指定script配置");
}
// 檢查腳本引擎是否可用
if (getScriptEngine(scriptType) == null) {
return ValidationResult.error("不支持的腳本類型: " + scriptType +
", 可用類型: " + scriptEngines.keySet());
}
return ValidationResult.success();
}
}
🧪 執行器測試示例
public class StepExecutorTest {
private FlowEngine flowEngine;
private FlowContext context;
@Before
public void setUp() {
flowEngine = new DefaultFlowEngine();
context = new DefaultFlowContext();
// 註冊執行器
flowEngine.registerExecutor(StepType.CONDITIONAL,
new ConditionalStepExecutor(new SimpleExpressionEvaluator(), flowEngine));
flowEngine.registerExecutor(StepType.PARALLEL,
new ParallelStepExecutor(flowEngine));
flowEngine.registerExecutor(StepType.LOOP,
new LoopStepExecutor(new SimpleExpressionEvaluator(), flowEngine));
flowEngine.registerExecutor(StepType.SCRIPT,
new ScriptStepExecutor());
}
@Test
public void testConditionalExecutor() {
// 創建條件步驟
StepDefinition conditionalStep = StepDefinitionBuilder
.builder("age-check", StepType.CONDITIONAL)
.config("condition", "context.get('age') >= 18")
.config("trueSteps", Arrays.asList(
StepDefinitionBuilder
.builder("adult-process", StepType.SIMPLE)
.config("action", "set")
.config("value", "成年人處理")
.config("targetKey", "result")
.build()
))
.config("falseSteps", Arrays.asList(
StepDefinitionBuilder
.builder("minor-process", StepType.SIMPLE)
.config("action", "set")
.config("value", "未成年人處理")
.config("targetKey", "result")
.build()
))
.build();
// 測試成年人情況
context.put("age", 25);
StepExecutor executor = flowEngine.getExecutor(StepType.CONDITIONAL);
StepExecutionResult result = executor.execute(conditionalStep, context);
assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
assertEquals("成年人處理", context.get("result"));
// 測試未成年人情況
context.clear();
context.put("age", 16);
result = executor.execute(conditionalStep, context);
assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
assertEquals("未成年人處理", context.get("result"));
}
@Test
public void testParallelExecutor() {
// 創建並行步驟
StepDefinition parallelStep = StepDefinitionBuilder
.builder("parallel-tasks", StepType.PARALLEL)
.config("timeoutSeconds", 10)
.config("steps", Arrays.asList(
StepDefinitionBuilder
.builder("task1", StepType.SIMPLE)
.config("action", "sleep")
.config("value", 100)
.build(),
StepDefinitionBuilder
.builder("task2", StepType.SIMPLE)
.config("action", "sleep")
.config("value", 200)
.build()
))
.build();
long startTime = System.currentTimeMillis();
StepExecutor executor = flowEngine.getExecutor(StepType.PARALLEL);
StepExecutionResult result = executor.execute(parallelStep, context);
long endTime = System.currentTimeMillis();
assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
assertTrue("並行執行應該比串行執行快", endTime - startTime < 250);
ParallelStepExecutor.ParallelExecutionResult parallelResult =
(ParallelStepExecutor.ParallelExecutionResult) result.getResult();
assertEquals(2, parallelResult.getResults().size());
assertFalse(parallelResult.hasFailures());
}
@Test
public void testLoopExecutor() {
// 創建循環步驟
StepDefinition loopStep = StepDefinitionBuilder
.builder("counter-loop", StepType.LOOP)
.config("condition", "context.get('counter', 0) < 5")
.config("maxIterations", 10)
.config("steps", Arrays.asList(
StepDefinitionBuilder
.builder("increment", StepType.SIMPLE)
.config("action", "increment")
.config("value", "counter")
.config("targetKey", "counter")
.build()
))
.build();
context.put("counter", 0);
StepExecutor executor = flowEngine.getExecutor(StepType.LOOP);
StepExecutionResult result = executor.execute(loopStep, context);
assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
assertEquals(5, (int) context.get("counter"));
LoopStepExecutor.LoopExecutionResult loopResult =
(LoopStepExecutor.LoopExecutionResult) result.getResult();
assertEquals(5, loopResult.getTotalIterations());
}
@Test
public void testScriptExecutor() {
// 創建腳本步驟
StepDefinition scriptStep = StepDefinitionBuilder
.builder("calculate", StepType.SCRIPT)
.config("scriptType", "javascript")
.config("script", """
var a = context.get('a');
var b = context.get('b');
var result = a + b;
context.put('sum', result);
result;
""")
.build();
context.put("a", 10);
context.put("b", 20);
StepExecutor executor = flowEngine.getExecutor(StepType.SCRIPT);
StepExecutionResult result = executor.execute(scriptStep, context);
assertEquals(FlowExecutionResult.Status.SUCCESS, result.getStatus());
assertEquals(30, result.getResult());
assertEquals(30, (int) context.get("sum"));
}
}
🎯 設計亮點
✨ 1. 模板方法模式
- 🏗️ 統一的執行流程: AbstractStepExecutor定義了標準的執行模板
- 🔧 靈活的擴展點: 子類只需實現核心邏輯
- 🛡️ 統一的異常處理: 在基類中處理通用的異常情況
✨ 2. 策略模式
- 🎯 可插拔的執行器: 不同類型的步驟使用不同的執行策略
- 🔄 運行時選擇: 根據步驟類型動態選擇執行器
- 🚀 易於擴展: 添加新的步驟類型只需實現新的執行器
✨ 3. 組合模式
- 🌳 遞歸執行: 複雜步驟可以包含子步驟
- 🔀 統一接口: 簡單步驟和複合步驟使用相同的接口
- 📊 層次結構: 支持任意深度的步驟嵌套
✨ 4. 併發安全
- 🧵 線程安全: 使用線程安全的數據結構
- 🔒 上下文隔離: 並行執行時創建獨立的子上下文
- ⚡ 異步支持: 支持異步和並行執行
🎉 小結
在這一章中,我們完成了:
- 🏗️ 設計了執行器架構: 基於AbstractStepExecutor的可擴展架構
- 🔀 實現了條件執行器: 支持基於表達式的條件分支
- ⚡ 實現了並行執行器: 支持多任務並行執行
- 🔄 實現了循環執行器: 支持條件循環和迭代控制
- 📝 實現了腳本執行器: 支持多種腳本語言的動態執行
- 🧪 提供了完整測試: 驗證各種執行器的功能
🚀 下一步
在下一章《上下文管理機制》中,我們將:
- 📦 深入設計FlowContext的實現
- 🔒 實現線程安全的上下文管理
- 🌳 設計層次化的上下文結構
- 🔄 實現上下文的序列化和持久化
- 🧪 編寫上下文管理的完整測試
💡 思考題:
- 如何設計執行器的優先級和選擇策略?
- 如何實現執行器的熱插拔和動態加載?
- 如何設計才能支持執行器的配置和參數化?