博客 / 詳情

返回

《底層到底做了什麼》--- reactor 模型裏的flux

引言:

spring cloud gateway --> webflux --> reactor-netty --> reactor-core

以上是github上幾個項目的依賴關係。

説明:

reactor-core是reactive programming模型的一個具體實現。

本文簡單説明在reactor-core中,flux工作底層到底做了什麼。

先説給一個完整的demo代碼

    Flux<String> data = Flux.just("hello", "hello2");//1
    data = data.map(e -> e + " world"); //2
    data.subscribe( e ->  System.out.println(e) );//3


1、Flux<String> data = Flux.just("hello", "hello2");

這個方法內部創建一個FluxArray對象,把數據存儲到array參數裏。

2、data = data.map(e -> e + " world");

這個方法內部創建一個FluxMapFuseable對象,把FluxArray對象存儲到source參數裏,把(e -> e + " world")存儲到mapper。

這個方法邏輯就是通過把原來的數據和新的處理邏輯,一層一層的封裝起來。

3、data.subscribe( e -> System.out.println(e) );

在方法的底層調用的是Flux類的subscribe()方法,比較複雜,下面展開説明。

subscribe的具體代碼

     OptimizableOperator operator = (OptimizableOperator)publisher;             
     while(true) {
                subscriber = operator.subscribeOrReturn(subscriber);//1
                if (subscriber == null) {
                    return;
                }

                OptimizableOperator newSource = operator.nextOptimizableSource();//2
                if (newSource == null) {
                    publisher = operator.source();
                    break;
                }

                operator = newSource;
            }
        }

        publisher.subscribe(subscriber);//3

參數:publisher就是上面的FluxMapFuseable對象,subscriber就是上面( 的(e -> System.out.println(e) )。

1、subscriber = operator.subscribeOrReturn(subscriber);

這個方法內部MapFuseableSubscriber創建一個FluxMapFuseable對象,把subscriber對象存儲到actual參數裏,把FluxMapFuseable對象裏的map,也就是(e -> e + " world")存儲到mapper。

這個方法與生成FluxMapFuseable對象類似,之前是把原始的array一層一層的套上map,現在反過來把FluxMapFuseable對象上的map一層一層解開,再套到subscriber上。

2、OptimizableOperator newSource = operator.nextOptimizableSource();

這個方法比較好理解了,就是獲取FluxMapFuseable對象的裏面一層。

3、publisher.subscribe(subscriber);

出了while循環,開始真正執行 subscribe。

參數:publisher就是原始的fluxArray數據,subscriber是一層一層的map,最外層是(e -> e + " world"),最裏層是(e -> System.out.println(e) )。

subscribe()方法裏,先調用fluxArray的fastPath(這個方法名先忽略含義),裏面會對每個array元素調用subscriber的onNext方法。onNext方法調用map中的(e -> e + " world")處理數據,然後交給裏一層subscriber,繼續調用onNext方法,最終到(e -> System.out.println(e) )。

到此,調用完成。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.