博客 / 詳情

返回

Flutter--關於流你需要知道的

寫代碼有時候就像坐過山車一樣,當你在有如神助開心搬磚的時候,突然間又手足無措不知道該如何是好。這種情況還循環往復,有時候一天都這樣,有時候整個你的開發生涯都差不多這樣。

尤其在面對Stream的時候這樣的情況更加明顯。Stream的很多概念會讓你覺得很簡單,有些有會讓你抓不到要點,尤其對於Dart或者Flutter的新手的時候。為什麼會這樣的呢?這是因為Strem實在是太過基礎,比如很多感知設備發出來的信號,一些狀態管理工具甚至於Dart的isolate等都是以Stream為基礎。

但是,Stream也沒有難到“蜀道難”的程度,只需要稍加留心和多練習就可以掌握它。

我們先來看一個簡單的例子:

Stream<int> countStream(int to) async* { // 1
  for (int i = 1; i <= to; i++) {
    await Future.delayed(const Duration(seconds: 1)); // 2
    yield i; // 3
  }
}
  1. 使用async*返回一個流,這個流的類型是Stream<int>
  2. 在這裏延遲一秒鐘。
  3. yield在流裏面發出一個值。

在這個簡單的例子裏面製造了一個流,這個流每隔一秒鐘發出一個數值。在StreamBuilder中可以消費這個方法返回的流,具體是這樣的:

  // 在StatefulWidget裏
  final _countStream = countStream(100); // 1

  @override
  Widget build(BuildContext context) {
    return Scaffold(
        appBar: AppBar(
          title: const Text("STREAM"),
        ),
        body: StreamBuilder(    // 2
            stream: _countStream,
            builder: (context, snapshot) {
              if (snapshot.hasError) {  // 3
                return Center(
                  child: Text("error ${snapshot.error}"),
                );
              } else {
                switch (snapshot.connectionState) {  // 3
                  case ConnectionState.none:
                    return const Center(
                      child: Text("None"),
                    );
                  case ConnectionState.waiting:
                    return const Center(
                      child: Text("Waiting..."),
                    );
                  case ConnectionState.active:
                    return Center(
                      child: Text("Active: ${snapshot.data}"),  // 4
                    );
                  case ConnectionState.done:
                    return Center(
                      child: Text("Active: ${snapshot.data}"),
                    );
                }
              }
            }));
  }
  1. 在StatefulWidget裏的成員狀態final _countStream = countStream(100);
  2. 使用StreamBuilder來消費_countStream流。
  3. StreamBuildersnapshot獲取流執行的狀態。
  4. 使用snapshot的流數據。

什麼是流(Stream)

alt text

什麼是流。就是一個或者多個事件,不斷的發射(到一個管道里)。在(管道)另一端的監聽器可以消費這些事件。在上面的例子中,countStream會不斷的發射事件,StreamBuilder可以消費這些事件。

流和Future類似,也是dart實現異步操作的方式之一。Future可以異步地返回數據,異常之後停止執行。流也類似,可以異步發出一串數據(或者異常)。

流是怎麼工作的

一般來説,流會把數據從管道的一段發到另一端。在這個管道上可以有一個、多個的監聽器訂閲這些數據。這些監聽器根據某些條件對這些數據做一些操作。

具體如何使用流呢?

  1. 使用已有的流
  2. 新建流

流,我們有一些瞭解了。具體在Dart使用流也有一些條件。

  1. 單定流(single-subscription stream)
  2. 廣播流

新建流

使用已有的流生成流

使用已有的流非常常見。比如,你用了第三方庫、某些基於流的狀態管理工具就會遇到直接生成的流。

這裏沒有第三方庫,直接生成一個流。再用這個流。

import 'dart:async';

void main() {
  // 新建一個整數流
  final Stream<int> originalStream = Stream<int>.fromIterable([1, 2, 3, 4, 5]); // 1

  // 使用map轉化流
  final Stream<int> transformedStream = originalStream.map((int value) { // 2
    return value * 2; // Double each integer
  });

  // 訂閲監聽剛剛轉化的流
  final StreamSubscription<int> subscription = // 3
      transformedStream.listen((int value) {
    print('Transformed value: $value');
  });

  // 取消訂閲
  Future.delayed(Duration(seconds: 1), () { // 4
    subscription.cancel();
  });
}
  1. 直接生成一個整數流。
  2. 使用一個已經生成的流,就是剛剛生成的整數流。使用map轉化整數流,每個流的值翻倍。
  3. 訂閲轉化的流。
  4. 取消訂閲。注意,在使用流的時候不用的流需要取消,否則會造成內存泄漏。

使用生成器生成流

這個類型的方法就是countStream所使用的的方法。使用async*表明這個方法返回的是一個流。在方法的內部使用yield向流內部發射值。

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

使用StreamController生成流

相比之下,StreamController比生成器更加好用。

Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // 向流發射數據
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // 關閉流,並通知所有訂閲者
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick); // 3
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>( // 1
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream; // 2
}
  1. 初始化一個StreamController。在構造這個流控制器的參數都是流控制器的回調。在監聽、暫停、resume和取消的時候需要代碼如何處理具體的問題都可以設置回調處理。
  2. 返回控制器的流。
  3. 在訂閲流的時候控制器回調可以在onListen中開始向流發射數據。注意: 流的數據可以包括具體的業務數據,也可以包含異常。

StreamController提供了創建流的很多便利,但是也有一個問題需要注意。如果StreamController的準備好發射了,沒有訂閲者,那麼這些數據會緩存,從而導致內存泄露。所以,沒訂閲,不發射是流使用的一條黃金規則。

使用流

使用流,就是訂閲流。訂閲了之後就會收到流發射出來的各種數據。

對於Dart使用流的限制條件需要詳細瞭解,否則就算在StreamBuilder這麼簡單是使用環境也會報錯。

Dart默認的流是單定流

單定流(single-subscription stream)在其生命週期中,至允許存在一個訂閲者,或者是監聽器。就算取消了一個已經存在的訂閲者,也不能再訂閲了。如果強行訂閲,報錯:Bad State

  StreamController<int> streamController = StreamController<int>(); // 1

  StreamSubscription<int> subscription = streamController.stream.listen( // 2
    (int data) {
      print('Received data: $data');
    },
  );

  subscription.cancel(); // 3

  subscription = streamController.stream.listen( // 4
    (int data) {
      print('Received data again: $data');
    },
  );

  streamController.close(); // 5

最終會報錯!

  1. 初始化一個流控制器。
  2. 訂閲流。
  3. 取消流的訂閲。
  4. 再次訂閲流。
  5. 關閉流。

單點流的這個特點對於數據完整性和順序有要求的需求非常有用。比如解析Http的請求,讀一個文件,或者處理聊天app的消息。

單點流只有在訂閲之後才會開始發出數據,在訂閲者取消點閲之後數據就不會再發出。即使還有數據需要發出。

但是,如果需要多個訂閲者呢?

如果你需要在app的多個部分都訂閲一個流,如果你的app多個部分需要在一個事件發生之後同時做出反應呢?這就需要廣播流了!

廣播流

廣播流可以有多個訂閲者。而且不管有沒有訂閲者,只要準備就緒就會開始不斷的發出數據。廣播流發出的數據沒有順序的要求。訂閲者完全可以在收到數據之後就做出對應的處理。比如,頭條新聞、球賽比分或者天氣預報之類。這樣似乎看起來多少有點浪費資源。所以在使用廣播流的時候要加些小心,否則可能會導致內存泄漏。

所以,在訂閲者收到了done事件之後會好取消訂閲。

import 'dart:async';

void main() {
  // 1
  StreamController<int> broadcastController = StreamController<int>.broadcast();

  // 2
  StreamSubscription<int> subscription1 = broadcastController.stream.listen(
    (data) {
      print('Listener 1 Received: $data');
    },
  );

  // 3
  StreamSubscription<int> subscription2 = broadcastController.stream.listen(
    (data) {
      print('Listener 2 Received: $data');
    },
  );

  // 4
  broadcastController.add(1);
  broadcastController.add(2);
  broadcastController.add(3);

  // 5
  broadcastController.close();
}
  1. 使用StreamBuilder<T>.broadcast()來新建一個廣播流控制器,這個控制器返回的stream就是廣播流了。
  2. 第二、三步訂閲廣播流。
  3. 訂閲廣播流。
  4. 給流添加數據(事件)。
  5. 關閉流。

在Flutter的實例。在stream_page.dart文件中,給組件添加一個控制器成員。可以控制這個控制器出的初始化方法,用構造函數就是單定流,用broadcast就是廣播流。

給這個流控制器兩個StreamBuilder,讓這兩個builder訂閲流。這樣可以對比單定流和廣播流在多個訂閲下的反應是什麼。

class StreamPage extends StatefulWidget {
  StreamPage({super.key});

  final controller = StreamController<int>(); // *

  @override
  State<StatefulWidget> createState() => _StreamPageState();
}

在widget成員里加流控制器。現在這個流控制器使用的是單定流。下面看看單定流在StreamBuilder的使用情況:

不過首先需要在這個按鈕中給這個扣控制器添加數據(事件):

ElevatedButton(
  onPressed: () async { // 1
    if (mounted) {
      setState(() {
        _countStream2 = widget.controller.stream; // 2
      });
    }

    for (int i = 1; i <= 10; i++) {
      await Future.delayed(const Duration(seconds: 1)); // 3
      widget.controller.add(i);
    }
  },
  child: const Text("Start stream")
)
  1. 需要實現和async*方法生成流的方式一樣的功能,每隔一秒發射一個數字。所以這裏需要用async
  2. 給狀態成員一個流,也可以在initState裏直接賦值。或者初始化的時候直接賦值。這裏主要説明,在async事件中使用setState的時候需要先判斷mounted屬性。否則會有警告。
  3. 延遲一秒發射數字。

運行效果,報錯:Bad state。這也説明單定流只能有一個訂閲。這裏的效果可能會受到hot reload的影響。

final controller = StreamController<int>();換成final controller = StreamController<int>.broadcast();。再次運行代碼,一切正常運行。

不要忘記在dispose方法中關閉流。

  @override
  void dispose() {
    widget.controller.close();
    super.dispose();
  }

異常處理

await for來説,流數據會不斷的發射數據,一直到全部的數據都發射完畢。但是,不巧遇到了問題,比如網絡下載的文件突然就斷了之類的。這時候流也會停止。Dart的流也很類似,在遇到第一個錯誤的時候就停止執行。當然也有例外,稍後討論。

import 'dart:async';

void main() {
  // 1
  StreamController<int> streamController = StreamController<int>();

  // 2
  StreamSubscription<int> subscription = streamController.stream.listen(
    (int data) {
      print('Received data: $data');
    },
    onError: (error) {
      print('Error occurred: $error');
    },
    onDone: () {
      print('Stream is done.');
    },
  );

  // 3
  streamController.add(1);
  streamController.add(2);
  throw Exception("Error");
  streamController.add(3);

  // 5
  streamController.close();
}

運行結果只會有1和2兩個數字。

  1. 初始化一個流控制器。
  2. 訂閲流。
  3. 給流添加數據
  4. 添加一個異常
  5. 關閉流

在Widget裏執行:

  for (int i = 1; i <= 10; i++) {
    await Future.delayed(const Duration(seconds: 1));
    if (i == 5) {
      // widget.controller.addError('Error with num $i');
      throw Exception("Number is $i"); // *
    } else {
      widget.controller.add(i);
    }
  }

拋出異常的時候,流停止發射值。

如果發生了異常的時候,還想要流繼續執行的話可以這樣:

 if (i == 3) yield* () async* { throw Exception(); }();
  // 或者:      yield* Stream.fromFuture(Future.error(Exception());
  // 或者:      yield* Stream.error(Exception()); 
  // 或者:     controller.addError('錯誤描述'); // 這個是在使用流控制器的時候

總之就是不要讓異常把流擊穿了,而是讓異常變成了流要發射的值的一部分。

最後

瞭解了流的基礎知識之後就要開始基於流的狀態管理了。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.