🎯 目標: 實現流程編排框架的核心執行引擎
🤔 流程引擎的職責
流程引擎是整個框架的心臟,負責:
- 🎯 流程調度: 按照定義的順序執行步驟
- 🔧 執行器管理: 為不同類型的步驟選擇合適的執行器
- 📊 狀態跟蹤: 記錄流程和步驟的執行狀態
- 🛡️ 異常處理: 處理執行過程中的異常情況
- 📈 性能監控: 收集執行時間和性能指標
🏗️ 引擎架構設計
🎯 FlowEngine接口設計
/**
* 流程引擎接口
* 負責執行流程定義,管理執行狀態和上下文
*/
public interface FlowEngine {
/**
* 執行流程
* @param flowDefinition 流程定義
* @param context 執行上下文
* @return 執行結果
*/
FlowExecutionResult execute(FlowDefinition flowDefinition, FlowContext context);
/**
* 異步執行流程
* @param flowDefinition 流程定義
* @param context 執行上下文
* @return 異步執行結果
*/
CompletableFuture<FlowExecutionResult> executeAsync(FlowDefinition flowDefinition, FlowContext context);
/**
* 註冊步驟執行器
* @param stepType 步驟類型
* @param executor 執行器實例
*/
void registerExecutor(StepType stepType, StepExecutor executor);
/**
* 獲取步驟執行器
* @param stepType 步驟類型
* @return 執行器實例
*/
StepExecutor getExecutor(StepType stepType);
/**
* 添加執行監聽器
* @param listener 監聽器
*/
void addListener(FlowExecutionListener listener);
/**
* 移除執行監聽器
* @param listener 監聽器
*/
void removeListener(FlowExecutionListener listener);
}
🔧 StepExecutor接口設計
/**
* 步驟執行器接口
* 負責執行特定類型的步驟
*/
public interface StepExecutor {
/**
* 執行步驟
* @param stepDefinition 步驟定義
* @param context 執行上下文
* @return 執行結果
*/
StepExecutionResult execute(StepDefinition stepDefinition, FlowContext context);
/**
* 獲取支持的步驟類型
* @return 步驟類型
*/
StepType getSupportedType();
/**
* 驗證步驟配置
* @param stepDefinition 步驟定義
* @return 驗證結果
*/
default ValidationResult validate(StepDefinition stepDefinition) {
return ValidationResult.success();
}
}
⚙️ DefaultFlowEngine實現
/**
* 默認流程引擎實現
* 提供基礎的流程執行能力
*/
public class DefaultFlowEngine implements FlowEngine {
private static final Logger logger = LoggerFactory.getLogger(DefaultFlowEngine.class);
// 執行器註冊表
private final Map<StepType, StepExecutor> executors = new ConcurrentHashMap<>();
// 執行監聽器列表
private final List<FlowExecutionListener> listeners = new CopyOnWriteArrayList<>();
// 線程池(用於異步執行)
private final ExecutorService executorService;
public DefaultFlowEngine() {
this.executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("flow-engine-%d")
.setDaemon(true)
.build()
);
// 註冊默認執行器
registerDefaultExecutors();
}
@Override
public FlowExecutionResult execute(FlowDefinition flowDefinition, FlowContext context) {
logger.info("開始執行流程: {} [{}]", flowDefinition.getName(), flowDefinition.getId());
long startTime = System.currentTimeMillis();
FlowExecutionResult.Status status = FlowExecutionResult.Status.SUCCESS;
List<StepExecutionResult> stepResults = new ArrayList<>();
Throwable exception = null;
try {
// 發佈流程開始事件
publishEvent(new FlowStartedEvent(flowDefinition, context));
// 驗證流程定義
validateFlowDefinition(flowDefinition);
// 執行步驟
for (StepDefinition stepDefinition : flowDefinition.getSteps()) {
StepExecutionResult stepResult = executeStep(stepDefinition, context);
stepResults.add(stepResult);
// 如果步驟失敗,根據配置決定是否繼續
if (stepResult.getStatus() == FlowExecutionResult.Status.FAILED) {
if (!shouldContinueOnFailure(stepDefinition)) {
status = FlowExecutionResult.Status.FAILED;
break;
}
}
}
} catch (Exception e) {
logger.error("流程執行異常: {} [{}]", flowDefinition.getName(), flowDefinition.getId(), e);
status = FlowExecutionResult.Status.FAILED;
exception = e;
} finally {
long endTime = System.currentTimeMillis();
// 創建執行結果
FlowExecutionResult result = new FlowExecutionResult(
flowDefinition.getId(),
status,
context,
stepResults,
startTime,
endTime,
exception
);
// 發佈流程完成事件
publishEvent(new FlowCompletedEvent(flowDefinition, context, result));
logger.info("流程執行完成: {} [{}], 狀態: {}, 耗時: {}ms",
flowDefinition.getName(),
flowDefinition.getId(),
status,
endTime - startTime
);
return result;
}
}
@Override
public CompletableFuture<FlowExecutionResult> executeAsync(FlowDefinition flowDefinition, FlowContext context) {
return CompletableFuture.supplyAsync(
() -> execute(flowDefinition, context),
executorService
);
}
/**
* 執行單個步驟
*/
private StepExecutionResult executeStep(StepDefinition stepDefinition, FlowContext context) {
logger.debug("開始執行步驟: {} [{}]", stepDefinition.getName(), stepDefinition.getId());
long startTime = System.currentTimeMillis();
FlowExecutionResult.Status status = FlowExecutionResult.Status.SUCCESS;
Object result = null;
Throwable exception = null;
try {
// 發佈步驟開始事件
publishEvent(new StepStartedEvent(stepDefinition, context));
// 檢查前置條件
if (!checkPrecondition(stepDefinition, context)) {
logger.info("步驟前置條件不滿足,跳過執行: {} [{}]",
stepDefinition.getName(), stepDefinition.getId());
status = FlowExecutionResult.Status.SKIPPED;
return createStepResult(stepDefinition, status, null, startTime, System.currentTimeMillis(), null);
}
// 獲取執行器
StepExecutor executor = getExecutor(stepDefinition.getType());
if (executor == null) {
throw new IllegalStateException("未找到步驟類型的執行器: " + stepDefinition.getType());
}
// 執行步驟
StepExecutionResult stepResult = executor.execute(stepDefinition, context);
status = stepResult.getStatus();
result = stepResult.getResult();
} catch (Exception e) {
logger.error("步驟執行異常: {} [{}]", stepDefinition.getName(), stepDefinition.getId(), e);
status = FlowExecutionResult.Status.FAILED;
exception = e;
} finally {
long endTime = System.currentTimeMillis();
// 創建步驟執行結果
StepExecutionResult stepResult = createStepResult(
stepDefinition, status, result, startTime, endTime, exception
);
// 發佈步驟完成事件
publishEvent(new StepCompletedEvent(stepDefinition, context, stepResult));
logger.debug("步驟執行完成: {} [{}], 狀態: {}, 耗時: {}ms",
stepDefinition.getName(),
stepDefinition.getId(),
status,
endTime - startTime
);
return stepResult;
}
}
/**
* 檢查前置條件
*/
private boolean checkPrecondition(StepDefinition stepDefinition, FlowContext context) {
String precondition = stepDefinition.getPrecondition();
if (precondition == null || precondition.trim().isEmpty()) {
return true;
}
try {
// 使用表達式引擎評估條件(簡化實現)
return evaluateExpression(precondition, context);
} catch (Exception e) {
logger.warn("前置條件評估失敗: {}, 默認為true", precondition, e);
return true;
}
}
/**
* 評估表達式(簡化實現)
*/
private boolean evaluateExpression(String expression, FlowContext context) {
// 這裏是簡化實現,實際項目中應該使用專門的表達式引擎
// 如SpEL、OGNL等
if ("true".equals(expression)) {
return true;
}
if ("false".equals(expression)) {
return false;
}
// 簡單的上下文變量檢查
if (expression.startsWith("context.containsKey(")) {
String key = expression.substring(20, expression.length() - 2);
key = key.replace("'", "").replace("\"", "");
return context.containsKey(key);
}
return true;
}
/**
* 判斷是否在失敗時繼續執行
*/
private boolean shouldContinueOnFailure(StepDefinition stepDefinition) {
Object continueOnFailure = stepDefinition.getConfig().get("continueOnFailure");
return Boolean.TRUE.equals(continueOnFailure);
}
/**
* 創建步驟執行結果
*/
private StepExecutionResult createStepResult(
StepDefinition stepDefinition,
FlowExecutionResult.Status status,
Object result,
long startTime,
long endTime,
Throwable exception) {
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
status,
result,
startTime,
endTime,
exception
);
}
/**
* 驗證流程定義
*/
private void validateFlowDefinition(FlowDefinition flowDefinition) {
if (flowDefinition == null) {
throw new IllegalArgumentException("流程定義不能為空");
}
if (flowDefinition.getSteps() == null || flowDefinition.getSteps().isEmpty()) {
throw new IllegalArgumentException("流程必須包含至少一個步驟");
}
// 驗證步驟ID唯一性
Set<String> stepIds = new HashSet<>();
for (StepDefinition step : flowDefinition.getSteps()) {
if (!stepIds.add(step.getId())) {
throw new IllegalArgumentException("步驟ID重複: " + step.getId());
}
}
// 驗證步驟配置
for (StepDefinition step : flowDefinition.getSteps()) {
StepExecutor executor = getExecutor(step.getType());
if (executor != null) {
ValidationResult validationResult = executor.validate(step);
if (!validationResult.isValid()) {
throw new IllegalArgumentException(
String.format("步驟配置無效 [%s]: %s",
step.getId(), validationResult.getErrorMessage())
);
}
}
}
}
/**
* 發佈事件
*/
private void publishEvent(FlowExecutionEvent event) {
for (FlowExecutionListener listener : listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.warn("事件監聽器執行異常", e);
}
}
}
/**
* 註冊默認執行器
*/
private void registerDefaultExecutors() {
registerExecutor(StepType.SIMPLE, new SimpleStepExecutor());
registerExecutor(StepType.SERVICE, new ServiceStepExecutor());
registerExecutor(StepType.CONDITIONAL, new ConditionalStepExecutor());
// 其他執行器將在後續章節實現
}
@Override
public void registerExecutor(StepType stepType, StepExecutor executor) {
executors.put(stepType, executor);
logger.info("註冊步驟執行器: {} -> {}", stepType, executor.getClass().getSimpleName());
}
@Override
public StepExecutor getExecutor(StepType stepType) {
return executors.get(stepType);
}
@Override
public void addListener(FlowExecutionListener listener) {
listeners.add(listener);
}
@Override
public void removeListener(FlowExecutionListener listener) {
listeners.remove(listener);
}
/**
* 關閉引擎,釋放資源
*/
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
🔧 基礎執行器實現
🎯 SimpleStepExecutor
/**
* 簡單步驟執行器
* 用於執行基礎的同步操作
*/
public class SimpleStepExecutor implements StepExecutor {
private static final Logger logger = LoggerFactory.getLogger(SimpleStepExecutor.class);
@Override
public StepExecutionResult execute(StepDefinition stepDefinition, FlowContext context) {
logger.debug("執行簡單步驟: {}", stepDefinition.getId());
long startTime = System.currentTimeMillis();
try {
// 獲取配置
String action = (String) stepDefinition.getConfig().get("action");
Object value = stepDefinition.getConfig().get("value");
String targetKey = (String) stepDefinition.getConfig().get("targetKey");
// 執行操作
Object result = executeAction(action, value, context);
// 保存結果到上下文
if (targetKey != null && result != null) {
context.put(targetKey, result);
}
long endTime = System.currentTimeMillis();
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.SUCCESS,
result,
startTime,
endTime,
null
);
} catch (Exception e) {
logger.error("簡單步驟執行失敗: {}", stepDefinition.getId(), e);
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.FAILED,
null,
startTime,
System.currentTimeMillis(),
e
);
}
}
/**
* 執行具體操作
*/
private Object executeAction(String action, Object value, FlowContext context) {
if ("set".equals(action)) {
return value;
} else if ("get".equals(action)) {
return context.get((String) value);
} else if ("increment".equals(action)) {
String key = (String) value;
Integer current = context.get(key, 0);
return current + 1;
} else if ("sleep".equals(action)) {
try {
Thread.sleep(((Number) value).longValue());
return "slept for " + value + "ms";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Sleep interrupted", e);
}
}
return "action executed: " + action;
}
@Override
public StepType getSupportedType() {
return StepType.SIMPLE;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
// 驗證必要的配置
String action = (String) stepDefinition.getConfig().get("action");
if (action == null || action.trim().isEmpty()) {
return ValidationResult.error("簡單步驟必須指定action配置");
}
return ValidationResult.success();
}
}
🌱 ServiceStepExecutor
/**
* 服務步驟執行器
* 用於調用Spring Bean的方法
*/
public class ServiceStepExecutor implements StepExecutor {
private static final Logger logger = LoggerFactory.getLogger(ServiceStepExecutor.class);
// Bean容器(簡化實現,實際應該注入ApplicationContext)
private final Map<String, Object> beanContainer = new ConcurrentHashMap<>();
@Override
public StepExecutionResult execute(StepDefinition stepDefinition, FlowContext context) {
logger.debug("執行服務步驟: {}", stepDefinition.getId());
long startTime = System.currentTimeMillis();
try {
// 獲取配置
String beanName = (String) stepDefinition.getConfig().get("beanName");
String methodName = (String) stepDefinition.getConfig().get("methodName");
Object[] args = getMethodArguments(stepDefinition, context);
// 獲取Bean實例
Object bean = getBean(beanName);
if (bean == null) {
throw new IllegalStateException("未找到Bean: " + beanName);
}
// 調用方法
Object result = invokeMethod(bean, methodName, args);
// 保存結果到上下文
String resultKey = (String) stepDefinition.getConfig().get("resultKey");
if (resultKey != null && result != null) {
context.put(resultKey, result);
}
long endTime = System.currentTimeMillis();
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.SUCCESS,
result,
startTime,
endTime,
null
);
} catch (Exception e) {
logger.error("服務步驟執行失敗: {}", stepDefinition.getId(), e);
return new StepExecutionResult(
stepDefinition.getId(),
stepDefinition.getType(),
FlowExecutionResult.Status.FAILED,
null,
startTime,
System.currentTimeMillis(),
e
);
}
}
/**
* 獲取方法參數
*/
private Object[] getMethodArguments(StepDefinition stepDefinition, FlowContext context) {
@SuppressWarnings("unchecked")
List<String> argKeys = (List<String>) stepDefinition.getConfig().get("arguments");
if (argKeys == null || argKeys.isEmpty()) {
return new Object[0];
}
Object[] args = new Object[argKeys.size()];
for (int i = 0; i < argKeys.size(); i++) {
args[i] = context.get(argKeys.get(i));
}
return args;
}
/**
* 獲取Bean實例
*/
private Object getBean(String beanName) {
return beanContainer.get(beanName);
}
/**
* 調用方法
*/
private Object invokeMethod(Object bean, String methodName, Object[] args) throws Exception {
Class<?> beanClass = bean.getClass();
// 查找匹配的方法
Method method = findMethod(beanClass, methodName, args);
if (method == null) {
throw new NoSuchMethodException(
String.format("未找到方法: %s.%s", beanClass.getSimpleName(), methodName)
);
}
// 調用方法
method.setAccessible(true);
return method.invoke(bean, args);
}
/**
* 查找匹配的方法
*/
private Method findMethod(Class<?> clazz, String methodName, Object[] args) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.getName().equals(methodName) &&
method.getParameterCount() == args.length) {
return method;
}
}
return null;
}
/**
* 註冊Bean
*/
public void registerBean(String name, Object bean) {
beanContainer.put(name, bean);
logger.info("註冊Bean: {} -> {}", name, bean.getClass().getSimpleName());
}
@Override
public StepType getSupportedType() {
return StepType.SERVICE;
}
@Override
public ValidationResult validate(StepDefinition stepDefinition) {
String beanName = (String) stepDefinition.getConfig().get("beanName");
String methodName = (String) stepDefinition.getConfig().get("methodName");
if (beanName == null || beanName.trim().isEmpty()) {
return ValidationResult.error("服務步驟必須指定beanName配置");
}
if (methodName == null || methodName.trim().isEmpty()) {
return ValidationResult.error("服務步驟必須指定methodName配置");
}
return ValidationResult.success();
}
}
📊 事件系統設計
🎯 FlowExecutionEvent
/**
* 流程執行事件基類
*/
public abstract class FlowExecutionEvent {
private final long timestamp;
private final FlowDefinition flowDefinition;
private final FlowContext context;
protected FlowExecutionEvent(FlowDefinition flowDefinition, FlowContext context) {
this.timestamp = System.currentTimeMillis();
this.flowDefinition = flowDefinition;
this.context = context;
}
// getter方法...
}
/**
* 流程開始事件
*/
public class FlowStartedEvent extends FlowExecutionEvent {
public FlowStartedEvent(FlowDefinition flowDefinition, FlowContext context) {
super(flowDefinition, context);
}
}
/**
* 流程完成事件
*/
public class FlowCompletedEvent extends FlowExecutionEvent {
private final FlowExecutionResult result;
public FlowCompletedEvent(FlowDefinition flowDefinition, FlowContext context, FlowExecutionResult result) {
super(flowDefinition, context);
this.result = result;
}
public FlowExecutionResult getResult() {
return result;
}
}
/**
* 步驟開始事件
*/
public class StepStartedEvent extends FlowExecutionEvent {
private final StepDefinition stepDefinition;
public StepStartedEvent(StepDefinition stepDefinition, FlowContext context) {
super(null, context); // 簡化實現
this.stepDefinition = stepDefinition;
}
public StepDefinition getStepDefinition() {
return stepDefinition;
}
}
/**
* 步驟完成事件
*/
public class StepCompletedEvent extends FlowExecutionEvent {
private final StepDefinition stepDefinition;
private final StepExecutionResult result;
public StepCompletedEvent(StepDefinition stepDefinition, FlowContext context, StepExecutionResult result) {
super(null, context); // 簡化實現
this.stepDefinition = stepDefinition;
this.result = result;
}
// getter方法...
}
🎧 FlowExecutionListener
/**
* 流程執行監聽器接口
*/
public interface FlowExecutionListener {
/**
* 處理執行事件
* @param event 執行事件
*/
void onEvent(FlowExecutionEvent event);
/**
* 流程開始時調用
* @param event 流程開始事件
*/
default void onFlowStarted(FlowStartedEvent event) {
// 默認空實現
}
/**
* 流程完成時調用
* @param event 流程完成事件
*/
default void onFlowCompleted(FlowCompletedEvent event) {
// 默認空實現
}
/**
* 步驟開始時調用
* @param event 步驟開始事件
*/
default void onStepStarted(StepStartedEvent event) {
// 默認空實現
}
/**
* 步驟完成時調用
* @param event 步驟完成事件
*/
default void onStepCompleted(StepCompletedEvent event) {
// 默認空實現
}
}
🧪 使用示例
public class FlowEngineExample {
public static void main(String[] args) {
// 創建流程引擎
DefaultFlowEngine engine = new DefaultFlowEngine();
// 註冊測試Bean
ServiceStepExecutor serviceExecutor = (ServiceStepExecutor) engine.getExecutor(StepType.SERVICE);
serviceExecutor.registerBean("testService", new TestService());
// 添加監聽器
engine.addListener(new LoggingFlowExecutionListener());
// 創建流程定義
FlowDefinition flow = FlowDefinitionBuilder
.builder("test-flow")
.name("測試流程")
.addStep(
StepDefinitionBuilder
.builder("init", StepType.SIMPLE)
.config("action", "set")
.config("value", "Hello World")
.config("targetKey", "message")
.build()
)
.addStep(
StepDefinitionBuilder
.serviceStep("process", "testService", "processMessage")
.config("arguments", Arrays.asList("message"))
.config("resultKey", "result")
.build()
)
.addStep(
StepDefinitionBuilder
.builder("output", StepType.SIMPLE)
.config("action", "get")
.config("value", "result")
.build()
)
.build();
// 創建執行上下文
FlowContext context = new DefaultFlowContext();
// 執行流程
FlowExecutionResult result = engine.execute(flow, context);
// 輸出結果
System.out.println("執行狀態: " + result.getStatus());
System.out.println("執行時間: " + (result.getEndTime() - result.getStartTime()) + "ms");
System.out.println("上下文數據: " + context.getAll());
// 關閉引擎
engine.shutdown();
}
/**
* 測試服務類
*/
public static class TestService {
public String processMessage(String message) {
return "Processed: " + message;
}
}
/**
* 日誌監聽器
*/
public static class LoggingFlowExecutionListener implements FlowExecutionListener {
@Override
public void onEvent(FlowExecutionEvent event) {
if (event instanceof FlowStartedEvent) {
onFlowStarted((FlowStartedEvent) event);
} else if (event instanceof FlowCompletedEvent) {
onFlowCompleted((FlowCompletedEvent) event);
} else if (event instanceof StepStartedEvent) {
onStepStarted((StepStartedEvent) event);
} else if (event instanceof StepCompletedEvent) {
onStepCompleted((StepCompletedEvent) event);
}
}
@Override
public void onFlowStarted(FlowStartedEvent event) {
System.out.println("🚀 流程開始: " + event.getFlowDefinition().getName());
}
@Override
public void onFlowCompleted(FlowCompletedEvent event) {
System.out.println("✅ 流程完成: " + event.getResult().getStatus());
}
@Override
public void onStepStarted(StepStartedEvent event) {
System.out.println(" 🔧 步驟開始: " + event.getStepDefinition().getName());
}
@Override
public void onStepCompleted(StepCompletedEvent event) {
System.out.println(" ✅ 步驟完成: " + event.getResult().getStatus());
}
}
}
🎯 設計亮點
✨ 1. 責任分離
- 🎯 引擎專注調度: FlowEngine只負責流程調度和狀態管理
- 🔧 執行器專注執行: StepExecutor專注於具體步驟的執行邏輯
- 📊 清晰的接口: 每個組件都有明確的職責邊界
✨ 2. 可擴展架構
- 🔌 插件化執行器: 可以輕鬆添加新的步驟類型
- 🎧 事件驅動: 通過事件系統實現鬆耦合的擴展
- ⚙️ 配置驅動: 通過配置而非代碼控制行為
✨ 3. 健壯性設計
- 🛡️ 異常處理: 完善的異常捕獲和處理機制
- ✅ 參數驗證: 在執行前驗證配置的有效性
- 📊 狀態跟蹤: 詳細記錄每個步驟的執行狀態
✨ 4. 性能考慮
- ⚡ 異步支持: 提供異步執行能力
- 🧵 線程安全: 使用線程安全的數據結構
- 📈 性能監控: 記錄執行時間等性能指標
🎉 小結
在這一章中,我們完成了:
- ⚙️ 實現了FlowEngine: 核心的流程執行引擎
- 🔧 設計了StepExecutor: 可擴展的步驟執行器架構
- 🎯 實現了基礎執行器: SimpleStepExecutor和ServiceStepExecutor
- 📊 設計了事件系統: 支持流程執行的監控和擴展
- 🧪 提供了完整示例: 展示了引擎的使用方法
🚀 下一步
在下一章《步驟執行器設計》中,我們將:
- 🔧 深入設計更多類型的執行器
- 🔀 實現條件分支執行器
- ⚡ 實現並行執行器
- 🔄 實現循環執行器
- 🧪 編寫完整的執行器測試
💡 思考題:
- 如何設計才能支持步驟的超時控制?
- 如何實現流程的暫停和恢復功能?
- 如何設計才能支持分佈式環境下的流程執行?