大家好,我是半夏之沫 😁😁 一名金融科技領域的JAVA系統研發😊😊
我希望將自己工作和學習中的經驗以最樸實,最嚴謹的方式分享給大家,共同進步👉💓👈
👉👉👉👉👉👉👉👉💓寫作不易,期待大家的關注和點贊💓👈👈👈👈👈👈👈👈
👉👉👉👉👉👉👉👉💓關注微信公眾號【技術探界】 💓👈👈👈👈👈👈👈👈
前言
在日誌-log4j2日誌框架源碼學習一文中,對Log4j2的整體結構和同步打印流程進行了一個初步學習,本篇文章將對Log4j2異步打印流程進行學習。在同步日誌打印中,應用業務邏輯與日誌打印在一個線程中,應用後續業務邏輯需要等待日誌打印完成才能繼續執行,而異步日誌打印中,應用業務邏輯與日誌打印在不同線程中,Log4j2有專門的線程來完成日誌打印而不會阻塞應用後續業務邏輯的執行。Log4j2提供了兩種方式來配置異步日誌打印,分別是AsyncAppender和AsyncLogger,本篇文章先對AsyncAppender進行學習。
Log4j2版本:2.17.1
正文
首先需要搭建示例工程,在pom文件中引入Log4j2的依賴。
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
</dependencies>
再創建一個打印日誌的測試類,如下所示。
public class LearnLog4j2Async {
private static final Logger logger = LoggerFactory
.getLogger(LearnLog4j2Async.class);
public static void main(String[] args) {
logger.info("{} be happy every day.", "Lee");
}
}
使用AsyncAppender的方式來配置異步日誌打印,是基於<Async>標籤來配置並得到異步Appender,打印日誌時待打印的日誌會被添加到異步Appender的阻塞隊列中,然後由一個專門的後台線程消費阻塞隊列中的待打印日誌,這裏的阻塞隊列是ArrayBlockingQueue。使用AsyncAppender的方式的配置的一個例子如下所示。
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<!-- 配置兩個正常的Appender -->
<Console name="MyConsole" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%msg%n"/>
</Console>
<RollingFile name="MyFile" fileName="mylog.log"
filePattern="mylog.log.%i">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" />
<PatternLayout pattern="%msg%n"/>
<SizeBasedTriggeringPolicy size="20M"/>
</RollingFile>
<!-- 讓異步Appender引用正常Appender -->
<Async name="MyAsync">
<AppenderRef ref="MyConsole"/>
<AppenderRef ref="MyFile"/>
</Async>
</Appenders>
<Loggers>
<!-- 讓根日誌打印器引用異步Appender -->
<Root level="INFO">
<Appender-ref ref="MyAsync"/>
</Root>
</Loggers>
</Configuration>
通過上述的配置,在測試程序中的logger會繼承使用根日誌打印器的LoggerConfig,也就持有了name為MyAsync的異步Appender,那麼在打印日誌時,最終會調用到異步Appender的append()方法,這裏實際會調用到AsyncAppender的append()方法,如下所示。
public void append(final LogEvent logEvent) {
if (!isStarted()) {
throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
}
// 將MutableLogEvent轉換為Log4jLogEvent
// includeLocation控制是否將打印日誌的代碼行號傳遞給Log4jLogEvent
final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
// 調用transfer()方法將待打印日誌添加到阻塞隊列中
if (!transfer(memento)) {
if (blocking) {
if (AbstractLogger.getRecursionDepth() > 1) {
AsyncQueueFullMessageUtil.logWarningToStatusLogger();
// 如果阻塞隊列已滿且正處於遞歸調用中,則在當前線程中直接調用Appender打印日誌
logMessageInCurrentThread(logEvent);
} else {
// 如果阻塞隊列已滿但不處於遞歸調用中,則根據配置的AsyncQueueFullPolicy來決定如何處理該條日誌
final EventRoute route = asyncQueueFullPolicy.getRoute(dispatcher.getId(), memento.getLevel());
route.logMessage(this, memento);
}
} else {
error("Appender " + getName() + " is unable to write primary appenders. queue is full");
logToErrorAppenderIfNecessary(false, memento);
}
}
}
在上述append()方法中,實際完成的事情就是將待打印日誌添加到阻塞隊列中,這裏的阻塞隊列是ArrayBlockingQueue,如果添加到阻塞隊列成功,則append()方法執行完畢,如果失敗,則會再判斷當前是否是處於遞歸調用中,如果是處於遞歸調用中,則直接在當前線程中調用Appender來完成日誌打印,這樣的日誌打印是一個同步打印,如果沒有處於遞歸調用中,則會使用配置的AsyncQueueFullPolicy來決定如何處理該條日誌,根據配置的AsyncQueueFullPolicy的不同,處理策略有丟棄,排隊,阻塞或者直接打印等。
到這裏有兩個地方還需要進行討論,一是上述append()方法完成的事情是在應用線程裏往阻塞隊列offer()待打印日誌,那麼從阻塞隊列裏消費待打印日誌的打印線程是什麼時候啓動又如何工作的呢;二是當阻塞隊列滿時,配置的AsyncQueueFullPolicy具體會如何處理待打印日誌。下面將對上述兩個問題繼續進行討論。
已知Log4j2框架在首次獲取Logger時,會初始化LoggerContext,而初始化LoggerContext時有一個步驟就是將Log4j2配置對象XmlConfiguration設置給LoggerContext並啓動XmlConfiguration,這裏看一下XmlConfiguration的start()方法,如下所示。
public void start() {
if (getState().equals(State.INITIALIZING)) {
initialize();
}
LOGGER.debug("Starting configuration {}", this);
this.setStarting();
if (watchManager.getIntervalSeconds() >= 0) {
watchManager.start();
}
if (hasAsyncLoggers()) {
asyncLoggerConfigDisruptor.start();
}
final Set<LoggerConfig> alreadyStarted = new HashSet<>();
for (final LoggerConfig logger : loggerConfigs.values()) {
logger.start();
alreadyStarted.add(logger);
}
// 遍歷配置文件中配置的Appender並啓動
for (final Appender appender : appenders.values()) {
appender.start();
}
if (!alreadyStarted.contains(root)) {
root.start();
}
super.start();
LOGGER.debug("Started configuration {} OK.", this);
}
在XmlConfiguration的start()方法中,會遍歷配置文件中配置的所有Appender並啓動,在本小節的示例配置文件中,配置的Appender如下所示。
那麼配置的異步Appender也會在這裏被調用start()方法,其實現如下所示。
public void start() {
// 拿到配置的所有Appender的map
final Map<String, Appender> map = config.getAppenders();
final List<AppenderControl> appenders = new ArrayList<>();
// 遍歷異步Appender的AppenderRef,並根據AppenderRef去所有Appender的map中拿到為異步Appender引用的Appender
for (final AppenderRef appenderRef : appenderRefs) {
final Appender appender = map.get(appenderRef.getRef());
if (appender != null) {
appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
} else {
LOGGER.error("No appender named {} was configured", appenderRef);
}
}
if (errorRef != null) {
final Appender appender = map.get(errorRef);
if (appender != null) {
errorAppender = new AppenderControl(appender, null, null);
} else {
LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
}
}
if (appenders.size() > 0) {
// 創建AsyncAppenderEventDispatcher
// AsyncAppenderEventDispatcher實際上是一個Thread
dispatcher = new AsyncAppenderEventDispatcher(
getName(), errorAppender, appenders, queue);
} else if (errorRef == null) {
throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
}
// 創建隊列滿時的處理策略
asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
// 啓動AsyncAppenderEventDispatcher線程
dispatcher.start();
super.start();
}
AsyncAppender的start()方法首先會根據配置文件中的<AppenderRef>標籤的值將給AsyncAppender引用的Appender獲取出來,然後和阻塞隊列一起創建AsyncAppenderEventDispatcher,該類的類圖如下所示。
所以AsyncAppenderEventDispatcher本質是一個Thread對象,因此調用其start()方法後,AsyncAppenderEventDispatcher的run()方法就會運行起來,看一下AsyncAppenderEventDispatcher的run()方法的實現,如下所示。
public void run() {
LOGGER.trace("{} has started.", getName());
// 開始消費阻塞隊列中的日誌並打印
dispatchAll();
// 線程如果被停止,則調用dispatchRemaining()保證阻塞隊列中的日誌全部被打印
dispatchRemaining();
}
private void dispatchAll() {
while (!stoppedRef.get()) {
LogEvent event;
try {
// 從阻塞隊列消費日誌,隊列如果為空則一直等待直到有日誌為止
event = queue.take();
} catch (final InterruptedException ignored) {
// 重新設置中斷標誌位
interrupt();
break;
}
// 只有在調用了stop()方法後,才會往阻塞隊列中添加STOP_EVENT
// 所以消費到STOP_EVENT後,就退出循環
if (event == STOP_EVENT) {
break;
}
event.setEndOfBatch(queue.isEmpty());
dispatch(event);
}
LOGGER.trace("{} has stopped.", getName());
}
void dispatch(final LogEvent event) {
// 將日誌分發給引用的每個Appender進行打印
boolean succeeded = false;
for (int appenderIndex = 0; appenderIndex < appenders.size(); appenderIndex++) {
final AppenderControl control = appenders.get(appenderIndex);
try {
control.callAppender(event);
succeeded = true;
} catch (final Throwable error) {
LOGGER.trace(
"{} has failed to call appender {}",
getName(), control.getAppenderName(), error);
}
}
if (!succeeded && errorAppender != null) {
try {
errorAppender.callAppender(event);
} catch (final Throwable error) {
LOGGER.trace(
"{} has failed to call the error appender {}",
getName(), errorAppender.getAppenderName(), error);
}
}
}
由上述代碼可知,專門有一個線程(AsyncAppenderEventDispatcher)循環的從阻塞隊列中消費日誌,每消費一條日誌,就會將該條日誌分發給引用的Appender進行打印,本示例中,AsyncAppender引用的Appender有name為MyConsole的ConsoleAppender和name為MyFile的RollingFileAppender,所以每條被消費的日誌都會被這兩個Appender打印。那麼至此就知道了消費阻塞隊列中的日誌的線程的啓動是在LoggerContext初始化時啓動每個Appender的時候。
現在回到AsyncAppender的start()方法,其中有一行代碼是創建阻塞隊列滿時的AsyncQueueFullPolicy策略類,如下所示。
asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
所以繼續跟進AsyncQueueFullPolicyFactory的create()方法,如下所示。
public static AsyncQueueFullPolicy create() {
// 從系統變量中將log4j2.AsyncQueueFullPolicy獲取出來
final String router = PropertiesUtil.getProperties().getStringProperty(PROPERTY_NAME_ASYNC_EVENT_ROUTER);
// 如果沒有配置log4j2.AsyncQueueFullPolicy的值或其值為Default,則使用DefaultAsyncQueueFullPolicy作為阻塞隊列滿時的策略類
if (router == null || isRouterSelected(
router, DefaultAsyncQueueFullPolicy.class, PROPERTY_VALUE_DEFAULT_ASYNC_EVENT_ROUTER)) {
return new DefaultAsyncQueueFullPolicy();
}
// 如果配置的log4j2.AsyncQueueFullPolicy的值為Discard,則使用DiscardingAsyncQueueFullPolicy作為阻塞隊列滿時的策略類
if (isRouterSelected(
router, DiscardingAsyncQueueFullPolicy.class, PROPERTY_VALUE_DISCARDING_ASYNC_EVENT_ROUTER)) {
return createDiscardingAsyncQueueFullPolicy();
}
// 如果配置的log4j2.AsyncQueueFullPolicy的值既不是Default也不是Discard
// 則認為這個值是某個AsyncQueueFullPolicy接口的實現類的全限定名,使用這個類作為阻塞隊列滿時的策略類
return createCustomRouter(router);
}
由上可知,阻塞隊列滿時的策略類是由配置的log4j2.AsyncQueueFullPolicy的值來決定,當配置為Default時,策略類為DefaultAsyncQueueFullPolicy,當配置為Discard時,策略類為DiscardingAsyncQueueFullPolicy,如果既不是Default,也不是Discard,則log4j2.AsyncQueueFullPolicy的值應該是AsyncQueueFullPolicy接口的某個實現類的全限定名,此時使用這個實現類作為阻塞隊列滿時的策略類。
最後分析一下DefaultAsyncQueueFullPolicy和DiscardingAsyncQueueFullPolicy這兩個策略類的實現,首先是DefaultAsyncQueueFullPolicy,其getRoute()方法如下所示。
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
Thread currentThread = Thread.currentThread();
// 如果當前線程是後台線程,則直接在當前線程中調用Appender打印日誌
if (currentThread.getId() == backgroundThreadId
|| currentThread instanceof Log4jThread) {
return EventRoute.SYNCHRONOUS;
}
// 否則調用阻塞隊列的put()方法來將日誌添加到阻塞隊列中,這一操作會阻塞當前線程
return EventRoute.ENQUEUE;
}
DefaultAsyncQueueFullPolicy策略類的策略很簡單,判斷當前線程是否是後台線程(AsyncAppenderEventDispatcher),如果是的話,就直接調用Appender來打印日誌,如果不是則調用阻塞隊列的put()方法來將日誌添加到阻塞隊列中,這一操作將會阻塞當前線程。下面看一下DiscardingAsyncQueueFullPolicy的getRoute()方法,如下所示。
public EventRoute getRoute(final long backgroundThreadId, final Level level) {
// 如果當前日誌級別小於或等於閾值日誌級別,則丟棄該條日誌
if (level.isLessSpecificThan(thresholdLevel)) {
if (discardCount.getAndIncrement() == 0) {
LOGGER.warn("Async queue is full, discarding event with level {}. " +
"This message will only appear once; future events from {} " +
"are silently discarded until queue capacity becomes available.",
level, thresholdLevel);
}
return EventRoute.DISCARD;
}
// 如果當前日誌級別大於閾值日誌級別,則調用DefaultAsyncQueueFullPolicy策略類來處理該條日誌
return super.getRoute(backgroundThreadId, level);
}
DiscardingAsyncQueueFullPolicy首先會判斷當前這條日誌的級別是否小於或等於閾值日誌級別,如果是則丟棄這條日誌,閾值日誌級別由系統變量log4j2.DiscardThreshold配置,如果沒有配置則默認的閾值日誌級別是INFO,如果當前這條日誌的級別大於閾值日誌級別,則使用DefaultAsyncQueueFullPolicy默認策略類來處理這條日誌。實際上DiscardingAsyncQueueFullPolicy繼承於DefaultAsyncQueueFullPolicy,在默認策略的基礎上擴展增強了根據閾值日誌級別來丟棄日誌的策略。
至此,基於AsyncAppender進行異步日誌打印的源碼分析完畢。
總結
當使用AsyncAppender來異步打印日誌時,每個AsyncAppender持有一個ArrayBlockingQueue,並且每個AsyncAppender還會啓動一個後台線程來消費阻塞隊列中的待打印日誌,後台線程每消費一條待打印日誌,就會遍歷當前AsyncAppender持有的Appender來完成日誌打印。
基於AsyncAppender的異步日誌打印流程圖如下所示。
大家好,我是半夏之沫 😁😁 一名金融科技領域的JAVA系統研發😊😊
我希望將自己工作和學習中的經驗以最樸實,最嚴謹的方式分享給大家,共同進步👉💓👈
👉👉👉👉👉👉👉👉💓寫作不易,期待大家的關注和點贊💓👈👈👈👈👈👈👈👈
👉👉👉👉👉👉👉👉💓關注微信公眾號【技術探界】 💓👈👈👈👈👈👈👈👈