动态

详情 返回 返回

什麼是 rxjs 的 replaySubject - 动态 详情

rxjs 是一個強大的庫,為我們提供了豐富的功能來處理異步數據流。在這些功能中,ReplaySubject 是一個非常有用的類,它在多種情況下表現突出。

ReplaySubjectSubject 的一種變體。與 Subject 類似,它是一個多播的 Observable,允許多個 Observer 訂閲。然而,它有一個顯著的不同點:它會緩存一定數量的值,並將這些值重新發射給所有新的訂閲者。我們可以設定緩存的最大值數量或緩存的時間窗口。通過這種方式,ReplaySubject 使得新加入的訂閲者能夠接收到 Observable 中先前發射的值,即使這些值是在它們訂閲之前發射的。

接下來通過詳細分析其源代碼來理解其工作原理。

rxjs 的源代碼中,ReplaySubject 實際上是繼承自 Subject 的:

class ReplaySubject<T> extends Subject<T> {
  private buffer: T[] = [];
  private bufferSize: number;
  private windowTime: number;
  private scheduler: SchedulerLike;

  constructor(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, scheduler?: SchedulerLike) {
    super();
    this.bufferSize = bufferSize < 1 ? 1 : bufferSize;
    this.windowTime = windowTime < 1 ? 1 : windowTime;
    this.scheduler = scheduler;
  }
}

ReplaySubject 的構造函數接受三個參數:

  1. bufferSize:緩存的最大值數量,默認值為 Number.POSITIVE_INFINITY
  2. windowTime:緩存的時間窗口,默認值為 Number.POSITIVE_INFINITY,表示緩存的時間跨度是無限的。
  3. scheduler:可選的調度器,用於時間管理。

關鍵方法之一是 next 方法,它是用於發射一個值:

next(value?: T): void {
  const now = this.scheduler ? this.scheduler.now() : Date.now();
  this.buffer.push({ value, timestamp: now });

  this.trimBufferThenGetEvents();

  super.next(value);
}

這個方法除了調用 super.next(value) 以外,它還將值以及時間戳緩存到 buffer 中,並調用 trimBufferThenGetEvents 方法來維護緩存。

trimBufferThenGetEvents 方法是核心部分,它用來修剪緩存,確保它遵循 bufferSizewindowTime 的限制:

private trimBufferThenGetEvents() {
  const now = this.scheduler ? this.scheduler.now() : Date.now();
  const bufferSize = this.bufferSize;
  const windowTime = this.windowTime;
  const buffer = this.buffer;

  let events = buffer.slice();

  if (bufferSize < Number.POSITIVE_INFINITY) {
    events = events.slice(-(bufferSize));
  }

  if (windowTime < Number.POSITIVE_INFINITY) {
    events = events.filter(event => (now - event.timestamp) < windowTime);
  }

  this.buffer = events;
}

每次有新的值發射時,這個方法會對緩存進行修剪,確保其不超過設定的 bufferSizewindowTime

當訂閲者訂閲 ReplaySubject 時,他們會收到已經緩存的事件:

_subscribe(subscriber: Subscriber<T>): Subscription {
  const events = this.trimBufferThenGetEvents();
  for (let i = 0; i < events.length && !subscriber.closed; i++) {
    subscriber.next(events[i].value);
  }
  return super._subscribe(subscriber);
}

這個方法首先調用 trimBufferThenGetEvents 修剪緩衝區,然後將緩衝區中的所有事件發送給新訂閲者。

下面是一個示例,展示 ReplaySubject 的基本用法:

import { ReplaySubject } from 'rxjs';

// 創建一個 ReplaySubject,緩存最近 2 個值
const replaySubject = new ReplaySubject(2);

// 訂閲該 ReplaySubject
replaySubject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

// 發射 3 個值
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

// 這裏 observerB 會接收到最近的 2 個值 2 和 3
replaySubject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

// 依然會接收到值
replaySubject.next(4);

在這個例子中,ReplaySubject 緩存最近 2 個值。當 observerB 訂閲時,它會接收到值 23。發射值 4 時,兩個訂閲者都接收到了。

ReplaySubject 的使用場景

  1. 緩存數據:當您希望新的訂閲者能夠立即接收到最新的數據(可能是在它們訂閲之前發射的),可以使用 ReplaySubject。例如在實時聊天應用中,當新用户加入會話時,您可能希望他們看到最近的幾條消息。
  2. 重複播放:在某些情況下,您希望重播某個數據序列。例如,音視頻播放器的實現中,如果需要實現某種形式的回放功能,可以考慮採用 ReplaySubject 來緩存數據流並再現。
  3. 緩衝最新的信息:當某些數據流速特別快且訂閲者可能會錯過某些信息時,可以使用 ReplaySubject 緩存這些信息,使得訂閲者不會錯過重要的信息。
  4. 保存最近狀態:在 Redux 或其他狀態管理工具中,您可能想要保存最近的狀態,以便在用户重新打開頁面時能夠恢復狀態,可以使用 ReplaySubject 來實現保存和恢復狀態的功能。

下一步,通過一個更復雜的例子來展示 ReplaySubject 的更多功能,這裏演示一個場景,即顯示最近推送的三條數據,並每隔一段時間推送新數據:

import { ReplaySubject } from 'rxjs';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

// 創建一個 ReplaySubject,緩存最近 3 個值
const replaySubject = new ReplaySubject(3);

// 訂閲該 ReplaySubject
replaySubject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

// 使用 interval 生成一個數字流,每隔 1 秒發射一個值,最多發射 5 個值
interval(1000).pipe(take(5)).subscribe(value => replaySubject.next(value));

// 延時 3 秒後再加入新訂閲者
setTimeout(() => {
  replaySubject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 3000);

在這個例子中,observerA 會從 04 收到所有值。observerB 延時 3 秒之後訂閲,由於 ReplaySubject 記住了最近的 3 個值,observerB 會收到值 2, 3, 4

綜上所述,ReplaySubjectrxjs 中非常有用的工具,為我們提供了一種緩存和重播數據流的方式。瞭解它的工作原理和使用場景,及其源代碼的分析,能讓我們在實際項目中更好地利用它來解決複雜的異步處理問題。

user avatar monkeynik 头像 tssc 头像 best_6455a509a2177 头像 evans_bo 头像
点赞 4 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.