最近系統需要做一個日誌平台,對所有接入的系統進行日誌的統計分析,因為之前用的是kafka來實現各業務系統日誌接入日誌平台的,所以想到了直接使用kafka官方本身提供的一個實時計算框架kafka stream。
kafka stream的時間窗口有兩個重要的屬性:窗口大小和步長(移動間隔),滾動窗口Tumbling Time Window:步長等於窗口大小,滾動窗口是沒有記錄的重疊;跳躍窗口Hopping Time Window:步長不等於窗口大小。
我們的需求是要求預警每天從0點到24點時間段內發生操作或查詢次數過多的記錄,之前我用的是滾動窗口,窗口大小為一天,不過我看了kafka的默認實現,窗口設置在.windowedBy(TimeWindows.of(Duration.ofDays(1))),TimeWindows對象裏面主要的方法就是public Map<Long, TimeWindow> windowsFor(final long timestamp) {},根據記錄的時間戳來判斷是屬於哪個窗口,默認代碼為
@Override
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs;
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
while (windowStart <= timestamp) {
final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
windows.put(windowStart, window);
windowStart += advanceMs;
}
return windows;
}
該實現的窗口時間段是從8點到第二天的8點為一天,而不是需求要求的0點到24點,於是我重新實現了一個類OffsetTimeWindows
@Override
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
long windowStart = timestamp - (timestamp + offset) % sizeMs; //取得當前時間戳那天0點的時間戳
final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
while (windowStart <= timestamp) {
final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
windows.put(windowStart, window);
windowStart += advanceMs;
}
return windows;
}
其中增加了一個offset的參數,可以在初始化這個類的時候進行賦值,以達到自定義任意時間段的效果,我是需要0點,所以該offset我設置為28800000,經過測試,能夠完美實現該效果。
在實現該需求的過程中,我發現flink的客户端有直接提供設置偏移量的窗口類TumblingEventTimeWindows,而kafka本身是沒有實現的,目前看起來flink是功能更完備一些的。