Stories

Detail Return Return

rust進階.併發.Tokio.1.Tokio簡介 - Stories Detail

學習要,工作也不能拉下,所以這一段時間關於rust的博文少了些。

rust要學習的內容還很多,但我覺得應該優先打好基礎,這其中比較關注的是併發。

提到rust的併發,先回憶在書本<<rust編程語言>>有許多的內容:

1.併發和並行

2.通過信道(channel)共享進程間數據

關鍵庫和方法

std::sync::mpsc
mpsc::channel()
std::thread
thread::spawn()
 
3.通過內存共享進程間數據

關鍵庫和方法

std::sync::{Arc, Mutex}
std::thread
std::thread::JoinHandle
JoinHandle::join()
 
4.Send和Sync特質
Send 標記 trait 表明實現了 Send 的類型值的所有權可以在線程間傳送。幾乎所有的 Rust 類型都是Send 的,
 *   不過有一些例外,包括 Rc<T>,這是因為Rc的fetch_add(引用計數)屬於非原子操作
Sync 標記 trait 表明一個實現了 Sync 的類型可以安全的在多個線程中擁有其值的引用

5.core::future::future --未來值

 * 1.future 是一個現在可能還沒有準備好但將在未來某個時刻準備好的值
 * 2.Rust 提供了 Future trait 作為基礎組件,這樣不同的異步操作就可以在不同的數據結構上實現
 * 3.每一個實現了 Future 的類型會維護自己的進度狀態信息和 “ready” 的定義
 * 4.async 關鍵字可以用於代碼塊和函數
 * 5.在一個 async 塊或 async 函數中,可以使用 await 關鍵字來等待一個 future 準備就緒,這一過程稱為 等待一個 future
 * 6.檢查一個 future 並查看其值是否已經準備就緒的過程被稱為 輪詢(polling)
 * 7.在大多數情況下,編寫異步 Rust 代碼時,我們使用 async 和 await 關鍵字。
 *    Rust 將其編譯為等同於使用 Future trait 的代碼,這非常類似於將 for 循環編譯為等同於使用 Iterator trait 的代碼

 

雖然rust的標準庫已經可以解決併發的問題,但是異步操作據説還是需要依賴於第三方的運行時來進行,這其中有幾個可以選擇:

運行時庫 異步操作支持 Future 支持 性能表現 易用性 生態兼容性 適用場景
Tokio 多線程工作竊取調度器,支持百萬級併發連接 豐富組合子(join!, select!),兼容 async-std ★★★★★(行業標杆) ★★★☆(中等學習曲線) ★★★★★(Hyper/Tonic 等主流框架) 高性能網絡服務(API 網關、實時通信)
async-std 類似標準庫的異步 API(async_std::fs 等) std::future 完全兼容,支持 Futures Unordered ★★★☆(輕量高效) ★★★★★(零學習成本) ★★★☆(基礎工具鏈完善) 快速原型開發、CLI 工具、輕量 Web 服務
Smol 輕量級任務模型(類似 goroutine) 模塊化設計,可通過 async-channel 擴展 ★★★★☆(低資源佔用) ★★★★☆(簡潔 API) ★★☆(需依賴 async-executors 微服務、IoT 設備、極簡依賴鏈項目
Glommio 基於 io_uring 的線程親和性調度器 實驗性 Future API,支持 FutureExt 擴展 ★★☆(延遲波動) ★☆(底層 API) ★☆(無成熟 HTTP 庫) 研究性項目、探索新技術邊界
rustasync/runtime 底層抽象,支持自定義線程池/調度策略 靈活但複雜,需直接操作 Runtime::builder() ★★★☆(可調優) ★☆(高級開發者專用) ★★☆(需自行集成生態) 深度定製需求、自定義異步框架

從上表可以看出,tokio相對比較突出,尤其是性能和生態方面,而這時應用開發所需要關注的。

 

一、tokio發展歷史

Tokio 的誕生與 Rust 異步生態的演進緊密相連:

  1. 起源與整合(2016年)
    Tokio 最初是 futures(異步操作基本 API)和 mio(跨平台非阻塞 IO 庫)的整合庫,名為 futures_mio。2016 年 8 月更名為 Tokio,計劃構建一整套異步設施,核心組件包括 tokio-core(單線程模型)、tokio-iotokio-timer 等。

  2. 性能與易用性優化

    • 引入宏實現 async/await 語法(如 futures-await 庫),提升異步代碼的可讀性。
    • 通過 tokio-proto 等模塊提供上層協議支持,但後續因易用性問題被逐步優化。
  3. 生態定位與標準化(2018年)
    Rust 官方成立 net-wg 小組,將 futures 工作移交,並倡導“中立的 futures 提供基礎能力,Tokio 專注 IO 相關接口”的設計。Tokio 雖未完全採納此方案,但持續演進,成為事實標準的異步運行時。

  4. 現狀與影響
    Tokio 憑藉高性能(如支持百萬級併發連接)、豐富工具鏈(異步文件/網絡/定時器)和生態整合(如 Hyper、Axum 框架),成為 Rust 異步開發的首選方案。

命名由來:為什麼叫 Tokio?

Tokio 的命名靈感來源於 “Tokyo(東京)+ IO” 的組合,寓意:

  • 高效與繁忙:東京作為國際化大都市,象徵着高效處理海量任務的能力,與 Tokio 處理異步 IO 的目標契合。
  • 技術願景:項目初期目標是構建像東京都市圈一樣“繁忙卻高效”的異步運行時。

Tokio 的字面意思

  • 日語語境:Tokio 在日語中沒有實際含義,它是“東京”的西班牙語拼寫變體。日語中“東京”的標準表達為平假名「とうきょう」或漢字「東京」。
  • 語言差異:西班牙語採用“Tokio”拼寫東京,源於其語音規則(如保留詞尾 -o),而日語官方羅馬字轉寫為“Tokyo”。Tokio 這一拼寫主要出現在西班牙語系國家或國際交流場景中。

總結

Tokio 的命名既體現了技術願景(高效處理 IO),也藴含了文化隱喻(東京的國際化形象)。其發展歷程反映了 Rust 異步生態從底層整合到標準化、易用化的演進,最終成為高性能異步開發的基石。

二、tokio官方文檔概要

以下內容來自tokio的文檔。

通過: cargo doc --open --package tokio 能打開英文版本。

以下內容是對這個cargo doc的翻譯。

 

2.1、tokio旅程

編寫應用

tokio很適合用於編寫應用,在這種情況下使用人不需要擔憂所需要採用的tokio特性。

如果您不確定,那麼我們建議你使用“full"以確保在構建應用過程中不會有什麼阻滯。(是的,在這個年代磁盤已經不經不值錢了)

以下代碼會安裝tokio的所有模塊:

tokio = { version = "1", features = ["full"] }

 

2.1.1、編寫庫

如果是編寫庫,自然以提供最輕便的單元包為目標。為了達成這個目標,你應該確保只使用了你需要的特性(模塊)。如此,庫的用户就可以不要啓用一些無用的特性。

例如:

tokio = { version = "1", features = ["rt", "net"] }

 

2.1.2、使用任務(task)

rust的異步編程基於輕量、非阻塞的執行單元,這些執行單元稱為tasks.

tokio::task模塊提供了重要的工具一些和tasks一起工作:

  • spawn函數和JoinHandle類型-前者負責基於tokio運行時調度一個新任務,後者則等待任務的輸出
  • 用於異步任務中運行阻塞操作的函數

tokio::task模塊在特性"rt"啓用的時候才會提供。

tokio::sync包含了同步的功能,以便在需要的時候交流或者共享數據。這些功能包括:

  • 通道(oneshot,mpsc,watch和broadcast),用於在任務之間發送值
  • 非阻塞Mutex(互斥鎖),用於控制對一個共享可變值得存取
  • 一個異步Barrier類型(屏障),用於多個任務在開始計算前進行同步

tokio::sync只有特性"sync"啓用得時候才會提供。

 

tokio::time模塊提供了用於追蹤時間和調度工作的許多工具。包括設置任務的超時(timeouts),休眠(sleeping)將來要運行的工作,或者

以特定的間隔重複操作。

為了使用tokio:time,必須啓用"time"特性。

 

最後,tokio提供了執行異步任務的運行時。大部分應用可以使用#[tokio::main]宏,這樣應用就可以基於tokio運行時運行

然而,這個宏僅僅提供了基本的配置項。 作為一個替代,tokio:runtime模塊提供了更多的強大API,這些api能夠配置和管理運行時。

如果#[tokio:main]宏無法滿足需要,就應該使用這些api。

 

要使用這個運行時,必須啓用"rt"或者"rt-multi-thread"特性,這樣才可以分別開啓當前線程的single-threaded scheduler(單線程調度器)和multi-thread scheduler(多線程調度器)。

runtime module documentation提供了更多的細節。 

此外,特性"macros"啓用了#[tokio::main]和#[tokio::test]屬性。

 

 

2.1.3、cpu綁定任務和阻塞代碼

tokio能夠基於一些線程同時運行許多任務(線程池),方式是重複喚起每個線程當前運行任務。

然而,這種喚起只能基於.await關鍵點,因此耗費較長運行時間的代碼,如果沒有遇到遇到.await,那麼它們會阻止其它任務的運行。為了解決這個問題,tokio提供了兩種線程:核心線程和阻塞線程。

 

核心線程用於運行異步代碼,tokio默認為一個cpu內核喚起一個內核線程。但我們可以使用環境變量TOKIO_WORKER_THREADS覆蓋默認值。

 

阻塞線程則是根據需要喚起,能用於運行阻塞代碼。這些代碼會阻止其它任務運行,並讓它自己保持活躍(即使一段時間沒有用),這個保持活躍的功能可以通過thread_keep_alive進行配置。

由於tokio無法換出一個阻塞的任務,就像它可以和異步代碼一起工作(?),阻塞線程數上限非常大。這個上限可以同故宮Builder進行配置。

為了喚起一個阻塞任務,我們應該使用spawn_blocking函數。

#[tokio::main]
async fn main() {
    // This is running on a core thread.

    let blocking_task = tokio::task::spawn_blocking(|| {
        // This is running on a blocking thread.
        // Blocking here is ok.
    });

    // We can wait for the blocking task like this:
    // If the blocking task panics, the unwrap below will propagate the
    // panic.
    blocking_task.await.unwrap();
}

 

如果我們的代碼和cpu密切關聯,那麼我們應該會希望限制cpu上運行的線程數。因此我們應該用一個單獨的線程池來處理和cpu關係密切的任務。例如,我們可以考慮使用rayon庫。它也能創建額外的tokio運行時,用於處理cpu關係密切的任務,但是如果我們這麼做,就應該謹慎一些,因為這些額外的運行時候只運行cpu密切的任務,如果運行IO密切的任務,那麼表現不理想。

提示:如果使用rayon,我們可以創建一個通道,當rayon任務完成時,用於把結果發送回tokio。

 

2.1.4、異步IO

tokio能夠調度和運行任務,而且也提供用於異步處理io的每一樣東西。

tokio::io模塊提供了tokio異步核心io功能,包括AsyncRead,AsyncWrite,AsyncBufRead特質。

此外,當啓用了"io-util"特性後,tokio也提供了和這些特質相關的組合體和方法,並構建一個異步的組件給std::io。

 

tokio也包含了執行各種io的api,api和操作系統做異步的交互,它們包括:

  • tokio::net-包含了非阻塞版本的TCP,UDP和UNIX Domain Socket(要求啓用net特新)
  • tokio::fs -類似std::fs,用於異步處理文件系統io,要求啓用特性fs
  • tokio::signal-用於異步處理unix和windows的型號,要求啓用signal特性
  • tokio::process-用於喚起和管理字進程,要求啓用process特性

 

2.2、一個簡單的示例

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // In a loop, read data from the socket and write the data back.
            loop {
                let n = match socket.read(&mut buf).await {
                    // socket closed
                    Ok(0) => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                    }
                };

                // Write the data back
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

 

這個例子使用的tokio的異步io,包括net和io。

每當接到一個socket連接,就回啓動一個任務,這個任務回讀取並寫回內容到socket中。

 

2.3、特徵/特性標記

分為不可靠和可靠兩個部分。

注意,當前文章基於tokio的1.45.0版本。

編碼 名稱 啓用説明 是否穩定
full 所有 全部  
rt 運行時 tokio::spawn,當前線程調度器和非調度器工具
rt-multi-thread 多線程運行時 更重的多線程,工作竊取調度器(work-stealing scheduler)?
io-util io工具 基於IO的Ext特質等
io-std 標準IO Stdout,Stdin和Stderr類型
net 網絡 tokio::net類型(TpcStream,UnixStream,UdpSocket),還有AsyncFd(Unix類系統,可用於linux)和PollAio(FreeBsd)
time 時間 tokio::time類型,計時器
process 進程 tokio::proecss類型
macros tokio::main和tokio::test宏
sync 同步 tokio:sync類型
signal 信號 tokio::signal類型
fs 文件系統 tokio::fs類型
test-util 測試工具 tokio運行時測試框架
parking_lot 特車位?  
tracing 追蹤 要求啓用構建標記tokio_unstable
其它  
  • [task::Builder]
  • Some methods on task::JoinSet
  • runtime::RuntimeMetrics
  • [runtime::Builder::on_task_spawn]
  • [runtime::Builder::on_task_terminate]
  • [runtime::Builder::unhandled_panic]
  • [runtime::TaskMeta]

要求啓用構建標記tokio_unstable

特別注意:AsyncRead and AsyncWrite  這兩個特質總是可用,即使沒有申明(即這是最基本的部分)

 

如何在構建的時候設置tokio_unstable

在項目的 .cargo/config.toml中配置

[build]
rustflags = ["--cfg", "tokio_unstable"]

此外,通過環境變量配置也可以。

windows

$Env:RUSTFLAGS="--cfg tokio_unstable"

linux

export RUSTFLAGS="--cfg tokio_unstable"

 

2.4、支持的平台

tokio目前支持以下平台:

  • Linux
  • Windows
  • Andriod(API 級別21)
  • macOS
  • iOS
  • FreeBs

未來,tokio還會支持這些平台。然而,將來的版本可能會調整要求,這些要求包括libc版本,api級別,或者特定的FreeBSD版本。

 

除了這些平台,tokio也會傾向於在mio包能夠運行的平台上工作。 mio可以支持的平台參見:in mio’s documentation

然而,這些額外的平台,將來可能不被支持。

注意,Wine平台不同於windows。

2.4.1、wasm支持

tokio對於WASM平台的支持存在一些限制。

如果不啓用tokio_unstable標記,那麼可以支持以下特性:

  • sync
  • macros
  • io-util
  • rt
  • time

如果企圖支持其它特性,那麼會導致編譯失敗。

time模塊只會在那些支持timers(例如wasm32-wasi)的wasm平台上工作。

 

注意:如果運行時變得無限期空閒,那麼它會立刻終止,而不是永久阻塞。 對於不支持time的平台,這意味着運行時任何時候都不會變得空閒。

 

2.4.2、不穩定的WASM支持

tokio有可以不穩定地支持一些wasm特性。這種情況下要求啓用tokio_unstable標記。

tokio::net可以支持wasm32-wasi。然而,不是所有方法都支持網絡類型,當WASI不支持創建創建新的套接字的時候。

因此,套接字必須通過FromRawFd特質創建。

 

2.5、單元項目

 

2.5.1、重新導出

pub use task::spawn;

 

2.5.2、模塊

注:模塊基本和特性對應

編碼 名稱   説明
fs 文件 異步文件工具
io io 異步io中的特質,助手,和定義
net 網絡 TCP/UPD/Unix相關
process 進程 異步進程管理
runtime 運行時 tokio運行時
signal 信號 異步信號處理
stream 流相關
sync 同步 異步上下文中同步操作
task 任務 異步綠色線程
time 時間 追蹤時間的工具

 

2.5.3、宏

  1. join-等待併發的多個分支,當所有分支完成就會返回
  2. pin-在棧上釘住一個值
  3. select-等待多個併發的分支,當第一個任務完成,則返回,並放棄其它分支
  4. task_local-定義一個新的本地鍵,類型是tokio::task::LocalKey
  5. try_join-等待多個併發的分支,如果所有的成功Ok(_)則返回,如果發現一個Err()也會返回

 

2.5.4、屬性宏

  1. main-標記選定的運行時執行的異步函數。這個宏可以配置一個運行時,而不要求用户使用Runtime或者Builder
  2. test-類似main,但只用於測試環境。

 

2.6、單元包

相關單元包。

這個沒有什麼特別值得寫得內容,羅列下:

bytes 

顧名思義,和自己操作有關的,例如

use bytes::{BytesMut, BufMut};

let mut buf = BytesMut::with_capacity(1024);
buf.put(&b"hello world"[..]);
buf.put_u16(1234);

let a = buf.split();
assert_eq!(a, b"hello world\x04\xD2"[..]);

buf.put(&b"goodbye world"[..]);

let b = buf.split();
assert_eq!(b, b"goodbye world"[..]);

assert_eq!(buf.capacity(), 998);

bytes主要用於和網絡操作有關的場景。 tokio利用了零拷貝的編程。

可以使用rust已有的類型來創建字節數組,例如&[]或者Vev<u8>,但推薦使用Bytes

 

cfg_if

是一個配置有關的宏,構建類似if/else的代碼結構。

tokio由於需要需要兼顧多個平台,不可避免要存在不少的if/else.

示例:

cfg_if::cfg_if! {
    if #[cfg(unix)] {
        fn foo() { /* unix specific functionality */ }
    } else if #[cfg(target_pointer_width = "32")] {
        fn foo() { /* non-unix, 32-bit functionality */ }
    } else {
        fn foo() { /* fallback implementation */ }
    }
}

 

lock_api

和鎖相關的api。

大體可以看作是多標準rust鎖有關類型的封裝.好處在於省掉了不少繁複的操作,例如:

示例:

use lock_api::{RawMutex, Mutex, GuardSend};
use std::sync::atomic::{AtomicBool, Ordering};

// 1. Define our raw lock type
pub struct RawSpinlock(AtomicBool);

// 2. Implement RawMutex for this type
unsafe impl RawMutex for RawSpinlock {
    const INIT: RawSpinlock = RawSpinlock(AtomicBool::new(false));

    // A spinlock guard can be sent to another thread and unlocked there
    type GuardMarker = GuardSend;

    fn lock(&self) {
        // Note: This isn't the best way of implementing a spinlock, but it
        // suffices for the sake of this example.
        while !self.try_lock() {}
    }

    fn try_lock(&self) -> bool {
        self.0
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_ok()
    }

    unsafe fn unlock(&self) {
        self.0.store(false, Ordering::Release);
    }
}

// 3. Export the wrappers. This are the types that your users will actually use.
pub type Spinlock<T> = lock_api::Mutex<RawSpinlock, T>;
pub type SpinlockGuard<'a, T> = lock_api::MutexGuard<'a, RawSpinlock, T>;

 

mio

看起來有點像minio,但和minio不同,mio用於構建非阻塞的IO應用,而且它的實現比較快,層級比較低。能儘量減少操作系統的負荷。

mio包括多個子模塊,包括event,features,guide,net,windows。

 

parking_lot

提供了比rust標準庫更小更快更靈活的實現,包括Mutex,RwLock,Condvar,Once等.

看起來有點像lock_api

 

parking_lot_core

和parking_lot有關的內容。

 

pin_project_lite

輕量版本的pin-project。

 

proc_macro2

過程宏的包裝器。

例如:

extern crate proc_macro;

#[proc_macro_derive(MyDerive)]
pub fn my_derive(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
    let input = proc_macro2::TokenStream::from(input);

    let output: proc_macro2::TokenStream = {
        /* transform input */
    };

    proc_macro::TokenStream::from(output)
}

 

quote

宏,用於把rust語法樹數據結構轉為源碼。

let tokens = quote! {
    struct SerializeWith #generics #where_clause {
        value: &'a #field_ty,
        phantom: core::marker::PhantomData<#item_ty>,
    }

    impl #generics serde::Serialize for SerializeWith #generics #where_clause {
        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
        where
            S: serde::Serializer,
        {
            #path(self.value, serializer)
        }
    }

    SerializeWith {
        value: #value,
        phantom: core::marker::PhantomData::<#item_ty>,
    }
};

 

scopeguard

範圍守衞,允許匿名函數在範圍之外運行。

看起來像歪門邪道!

extern crate scopeguard;

fn f() {
    let _guard = scopeguard::guard((), |_| {
        println!("Hello Scope Exit!");
    });

    // rest of the code here.

    // Here, at the end of `_guard`'s scope, the guard's closure is called.
    // It is also called if we exit this scope through unwinding instead.
}

 

smallvec

顧名思義,是小型的向量,但它的大小可變。可以提升性能,針對只需要很少數據的場景。

 

socket2

用於創建和使用套接字,和網絡編程相關。

但它的缺陷是不夠通用,因為要求儘量使用操作系統的已有的能力。

use std::net::{SocketAddr, TcpListener};
use socket2::{Socket, Domain, Type};

// Create a TCP listener bound to two addresses.
let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;

socket.set_only_v6(false)?;
let address: SocketAddr = "[::1]:12345".parse().unwrap();
socket.bind(&address.into())?;
socket.listen(128)?;

let listener: TcpListener = socket.into();
// ...

 

 

syn

名字容易聯想到synchronize,其實應該是syntax。

用於解析rust的代碼流為語法樹,進場和其它宏相關。

例如:

use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, DeriveInput};

#[proc_macro_derive(MyMacro)]
pub fn my_macro(input: TokenStream) -> TokenStream {
    // Parse the input tokens into a syntax tree
    let input = parse_macro_input!(input as DeriveInput);

    // Build the output, possibly using quasi-quotation
    let expanded = quote! {
        // ...
    };

    // Hand the output tokens back to the compiler
    TokenStream::from(expanded)
}

 

tokio

運行時,用於編寫可靠的網絡應用,同時還不需要犧牲速度。

它是一個事件驅動,非阻塞的平台,能用於編寫異步應用。

 

其它

tokio_macros,unicode_ident,windows_sys,windows_targets,windows_x86_64_msvc.

 

Add a new Comments

Some HTML is okay.