1. 概述
在許多其他教程中,我們已經討論過 BeanPostProcessor。在本教程中,我們將利用它在一個實際的例子中使用 Guava 的 EventBus。
Spring 的 BeanPostProcessor 提供了對 Spring Bean 生命週期鈎子的訪問,從而可以修改其配置。
BeanPostProcessor 允許直接修改 Bean 本身。
在本教程中,我們將探討這些類與 Guava 的 EventBus 集成的一個具體示例。
2. 環境搭建
首先,我們需要搭建我們的環境。我們將 Spring 上下文、Spring 表達式 和 Guava 依賴添加到我們的 <em pom.xml</em> 中:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>接下來,我們來討論一下我們的目標。
3. 目標與實現
為了我們的第一個目標,我們希望利用 Guava 的 EventBus 來實現系統各個方面之間的異步消息傳遞。
接下來,我們希望在 Bean 創建/銷燬時自動註冊和取消註冊對象,而不是使用 EventBus 提供的手動方法。
所以,我們現在準備好開始編碼了!
我們的實現將包括 Guava 的 EventBus 的包裝類、自定義標記註解、一個 BeanPostProcessor、一個模型對象和一個 Bean,用於接收來自 EventBus 的股票交易事件。 此外,我們還將創建一個測試用例以驗證所需的功能。
3.1. <em >EventBus</em> 包裝器
首先,我們將定義一個 <em >EventBus</em> 包裝器,以提供靜態方法,方便為事件註冊和取消註冊 Bean,這些 Bean 將被 <em >BeanPostProcessor</em> 使用。
public final class GlobalEventBus {
public static final String GLOBAL_EVENT_BUS_EXPRESSION
= "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()";
private static final String IDENTIFIER = "global-event-bus";
private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());
private GlobalEventBus() {}
public static GlobalEventBus getInstance() {
return GlobalEventBus.GLOBAL_EVENT_BUS;
}
public static EventBus getEventBus() {
return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
}
public static void subscribe(Object obj) {
getEventBus().register(obj);
}
public static void unsubscribe(Object obj) {
getEventBus().unregister(obj);
}
public static void post(Object event) {
getEventBus().post(event);
}
}這段代碼提供了訪問 GlobalEventBus 和其底層 EventBus 的靜態方法,以及註冊和取消註冊事件訂閲,以及發佈事件的功能。它還使用了 SpEL 表達式,作為我們自定義註解中的默認表達式,用於定義我們希望使用的 EventBus。
3.2. 自定義標記註解
接下來,我們定義一個自定義標記註解,該註解將由 BeanPostProcessor 用於識別需要自動註冊/取消註冊事件的 Bean:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {
String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}3.3. `BeanPostProcessor</h3
現在,我們將定義 BeanPostProcessor,它將檢查每個 Bean 是否具有 Subscriber 註解。 此類也是一個 DestructionAwareBeanPostProcessor,這是一個 Spring 接口,添加了在 Bean 被銷燬之前的回調。 如果存在該註解,我們將將其註冊到通過註解的 SpEL 表達式標識的 EventBus 上,並在 Bean 被銷燬時註銷它:
public class GuavaEventBusBeanPostProcessor
implements DestructionAwareBeanPostProcessor {
Logger logger = LoggerFactory.getLogger(this.getClass());
SpelExpressionParser expressionParser = new SpelExpressionParser();
@Override
public void postProcessBeforeDestruction(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::unregister, "destruction");
}
@Override
public boolean requiresDestruction(Object bean) {
return true;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::register, "initialization");
return bean;
}
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
// See implementation below
}
}代碼上方將每個 Bean 都通過 process 方法運行,該方法定義如下。它在 Bean 初始化後,在 Bean 被銷燬之前進行處理。 requiresDestruction 方法默認返回 true,我們在此保留該行為,因為我們檢查了 @Subscriber 註解的存在於 postProcessBeforeDestruction 回調中。
現在讓我們來看一下 process 方法:
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
Object proxy = this.getTargetObject(bean);
Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
if (annotation == null)
return;
this.logger.info("{}: processing bean of type {} during {}",
this.getClass().getSimpleName(), proxy.getClass().getName(), action);
String annotationValue = annotation.value();
try {
Expression expression = this.expressionParser.parseExpression(annotationValue);
Object value = expression.getValue();
if (!(value instanceof EventBus)) {
this.logger.error(
"{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
return;
}
EventBus eventBus = (EventBus)value;
consumer.accept(eventBus, proxy);
} catch (ExpressionException ex) {
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
}
}這段代碼檢查是否存在我們自定義的標記註解 Subscriber,如果存在,則從其 value 屬性中讀取 SpEL 表達式。然後,該表達式會被解析為一個對象。如果該對象是 EventBus 的實例,則將 BiConsumer 函數參數應用於該 Bean。 BiConsumer 用於在 EventBus 中註冊和取消註冊該 Bean。
方法 getTargetObject 的實現如下:
private Object getTargetObject(Object proxy) throws BeansException {
if (AopUtils.isJdkDynamicProxy(proxy)) {
try {
return ((Advised)proxy).getTargetSource().getTarget();
} catch (Exception e) {
throw new FatalBeanException("Error getting target of JDK proxy", e);
}
}
return proxy;
}3.4. StockTrade 模型對象
接下來,讓我們定義我們的 StockTrade 模型對象:
public class StockTrade {
private String symbol;
private int quantity;
private double price;
private Date tradeDate;
// constructor
}3.5. StockTradePublisher 事件接收器
然後,我們定義一個監聽器類,以便在收到交易信息時通知我們,從而編寫我們的測試:
@FunctionalInterface
public interface StockTradeListener {
void stockTradePublished(StockTrade trade);
}最後,我們將定義一個用於新 StockTrade 事件的接收器:
@Subscriber
public class StockTradePublisher {
Set<StockTradeListener> stockTradeListeners = new HashSet<>();
public void addStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.add(listener);
}
}
public void removeStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.remove(listener);
}
}
@Subscribe
@AllowConcurrentEvents
void handleNewStockTradeEvent(StockTrade trade) {
// publish to DB, send to PubNub, ...
Set<StockTradeListener> listeners;
synchronized (this.stockTradeListeners) {
listeners = new HashSet<>(this.stockTradeListeners);
}
listeners.forEach(li -> li.stockTradePublished(trade));
}
}上述代碼將此類標記為 Guava EventBus 事件的 Subscriber,Guava 的 @Subscribe 註解將方法 handleNewStockTradeEvent 標記為事件接收器。它接收的事件類型基於方法參數的類類型;在本例中,我們將接收 StockTrade 類型事件。
@AllowConcurrentEvents 註解允許併發調用此方法。收到交易後,我們執行任何我們希望執行的任何處理,然後通知任何監聽器。
3.6. 測試
現在,讓我們用一個集成測試來總結我們的編碼工作,以驗證 BeanPostProcessor 正確工作。首先,我們需要一個 Spring 上下文:
@Configuration
public class PostProcessorConfiguration {
@Bean
public GlobalEventBus eventBus() {
return GlobalEventBus.getInstance();
}
@Bean
public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
return new GuavaEventBusBeanPostProcessor();
}
@Bean
public StockTradePublisher stockTradePublisher() {
return new StockTradePublisher();
}
}現在我們可以實現我們的測試:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PostProcessorConfiguration.class)
public class StockTradeIntegrationTest {
@Autowired
StockTradePublisher stockTradePublisher;
@Test
public void givenValidConfig_whenTradePublished_thenTradeReceived() {
Date tradeDate = new Date();
StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
AtomicBoolean assertionsPassed = new AtomicBoolean(false);
StockTradeListener listener = trade -> assertionsPassed
.set(this.verifyExact(stockTrade, trade));
this.stockTradePublisher.addStockTradeListener(listener);
try {
GlobalEventBus.post(stockTrade);
await().atMost(Duration.ofSeconds(2L))
.untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
} finally {
this.stockTradePublisher.removeStockTradeListener(listener);
}
}
boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
&& Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
&& stockTrade.getQuantity() == trade.getQuantity()
&& stockTrade.getPrice() == trade.getPrice();
}
}上述測試代碼生成股票交易並將其發佈到 GlobalEventBus。 我們最多等待兩秒鐘以確認操作已完成並收到交易已通過 stockTradePublisher 發佈的通知。 此外,我們驗證接收到的交易在傳輸過程中未被修改。
4. 結論
綜上所述,Spring 的 BeanPostProcessor 允許我們自定義 Bean 本身,從而為我們提供了一種自動化 Bean 操作的手段,否則我們需要手動完成這些操作。