rxjs 是一個強大的庫,為我們提供了豐富的功能來處理異步數據流。在這些功能中,ReplaySubject 是一個非常有用的類,它在多種情況下表現突出。
ReplaySubject 是 Subject 的一種變體。與 Subject 類似,它是一個多播的 Observable,允許多個 Observer 訂閲。然而,它有一個顯著的不同點:它會緩存一定數量的值,並將這些值重新發射給所有新的訂閲者。我們可以設定緩存的最大值數量或緩存的時間窗口。通過這種方式,ReplaySubject 使得新加入的訂閲者能夠接收到 Observable 中先前發射的值,即使這些值是在它們訂閲之前發射的。
接下來通過詳細分析其源代碼來理解其工作原理。
在 rxjs 的源代碼中,ReplaySubject 實際上是繼承自 Subject 的:
class ReplaySubject<T> extends Subject<T> {
private buffer: T[] = [];
private bufferSize: number;
private windowTime: number;
private scheduler: SchedulerLike;
constructor(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, scheduler?: SchedulerLike) {
super();
this.bufferSize = bufferSize < 1 ? 1 : bufferSize;
this.windowTime = windowTime < 1 ? 1 : windowTime;
this.scheduler = scheduler;
}
}
ReplaySubject 的構造函數接受三個參數:
bufferSize:緩存的最大值數量,默認值為Number.POSITIVE_INFINITY。windowTime:緩存的時間窗口,默認值為Number.POSITIVE_INFINITY,表示緩存的時間跨度是無限的。scheduler:可選的調度器,用於時間管理。
關鍵方法之一是 next 方法,它是用於發射一個值:
next(value?: T): void {
const now = this.scheduler ? this.scheduler.now() : Date.now();
this.buffer.push({ value, timestamp: now });
this.trimBufferThenGetEvents();
super.next(value);
}
這個方法除了調用 super.next(value) 以外,它還將值以及時間戳緩存到 buffer 中,並調用 trimBufferThenGetEvents 方法來維護緩存。
trimBufferThenGetEvents 方法是核心部分,它用來修剪緩存,確保它遵循 bufferSize 和 windowTime 的限制:
private trimBufferThenGetEvents() {
const now = this.scheduler ? this.scheduler.now() : Date.now();
const bufferSize = this.bufferSize;
const windowTime = this.windowTime;
const buffer = this.buffer;
let events = buffer.slice();
if (bufferSize < Number.POSITIVE_INFINITY) {
events = events.slice(-(bufferSize));
}
if (windowTime < Number.POSITIVE_INFINITY) {
events = events.filter(event => (now - event.timestamp) < windowTime);
}
this.buffer = events;
}
每次有新的值發射時,這個方法會對緩存進行修剪,確保其不超過設定的 bufferSize 和 windowTime。
當訂閲者訂閲 ReplaySubject 時,他們會收到已經緩存的事件:
_subscribe(subscriber: Subscriber<T>): Subscription {
const events = this.trimBufferThenGetEvents();
for (let i = 0; i < events.length && !subscriber.closed; i++) {
subscriber.next(events[i].value);
}
return super._subscribe(subscriber);
}
這個方法首先調用 trimBufferThenGetEvents 修剪緩衝區,然後將緩衝區中的所有事件發送給新訂閲者。
下面是一個示例,展示 ReplaySubject 的基本用法:
import { ReplaySubject } from 'rxjs';
// 創建一個 ReplaySubject,緩存最近 2 個值
const replaySubject = new ReplaySubject(2);
// 訂閲該 ReplaySubject
replaySubject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// 發射 3 個值
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
// 這裏 observerB 會接收到最近的 2 個值 2 和 3
replaySubject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// 依然會接收到值
replaySubject.next(4);
在這個例子中,ReplaySubject 緩存最近 2 個值。當 observerB 訂閲時,它會接收到值 2 和 3。發射值 4 時,兩個訂閲者都接收到了。
ReplaySubject 的使用場景
- 緩存數據:當您希望新的訂閲者能夠立即接收到最新的數據(可能是在它們訂閲之前發射的),可以使用
ReplaySubject。例如在實時聊天應用中,當新用户加入會話時,您可能希望他們看到最近的幾條消息。 - 重複播放:在某些情況下,您希望重播某個數據序列。例如,音視頻播放器的實現中,如果需要實現某種形式的回放功能,可以考慮採用
ReplaySubject來緩存數據流並再現。 - 緩衝最新的信息:當某些數據流速特別快且訂閲者可能會錯過某些信息時,可以使用
ReplaySubject緩存這些信息,使得訂閲者不會錯過重要的信息。 - 保存最近狀態:在 Redux 或其他狀態管理工具中,您可能想要保存最近的狀態,以便在用户重新打開頁面時能夠恢復狀態,可以使用
ReplaySubject來實現保存和恢復狀態的功能。
下一步,通過一個更復雜的例子來展示 ReplaySubject 的更多功能,這裏演示一個場景,即顯示最近推送的三條數據,並每隔一段時間推送新數據:
import { ReplaySubject } from 'rxjs';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
// 創建一個 ReplaySubject,緩存最近 3 個值
const replaySubject = new ReplaySubject(3);
// 訂閲該 ReplaySubject
replaySubject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// 使用 interval 生成一個數字流,每隔 1 秒發射一個值,最多發射 5 個值
interval(1000).pipe(take(5)).subscribe(value => replaySubject.next(value));
// 延時 3 秒後再加入新訂閲者
setTimeout(() => {
replaySubject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 3000);
在這個例子中,observerA 會從 0 到 4 收到所有值。observerB 延時 3 秒之後訂閲,由於 ReplaySubject 記住了最近的 3 個值,observerB 會收到值 2, 3, 4。
綜上所述,ReplaySubject 是 rxjs 中非常有用的工具,為我們提供了一種緩存和重播數據流的方式。瞭解它的工作原理和使用場景,及其源代碼的分析,能讓我們在實際項目中更好地利用它來解決複雜的異步處理問題。