本文整理自_ _OpenAI_ _基礎設施團隊的 Shuyi Chen 和 Joey Pereira 在 Current 2025 倫敦會議上的演講 ”_Building a Stream Processing Platform at OpenAI_“
主要演講內容為:
- OpenAI 的流式基礎設施;
- 構建流處理平台的動機及遇到的挑戰;
- OpenAI 的整體架構及深入解讀;
- OpenAI 業務用例以及平台未來的演進方向。
一年前的流式基礎設施
回顧一年前,OpenAI 的流式基礎設施主要圍繞 Kafka 及其生產者和消費者服務構建。Kafka 被廣泛用於數據攝入、異步處理和服務間通信。隨着 ChatGPT 的上線,Kafka 需求迅速增長,已成為支撐眾多關鍵業務的核心基礎設施之一。
我們面臨的主要挑戰之一是確保 Kafka 基礎設施的可用性和可靠性。我們的 Kafka 基礎設施構建在雲上,曾一度擁有數十個 Kafka 集羣。在雲環境中,集羣可能崩潰、區域可能失效、網絡光纜也可能被切斷。因此,單個 Kafka 集羣可能成為依賴它的使用場景的單點故障。我們確實經歷過單個 Kafka 集羣故障對業務造成嚴重影響的實例。
為應對這一挑戰,我們引入了“高可用組”(high availability group)的概念。一個高可用組將跨區域的多個物理 Kafka 集羣組合在一起,以提供高可用性。這樣,當某個集羣故障時,我們可以繞過它,為生產者和消費者服務提供 HA 保障。例如,典型的 HA 組配置包括一個 West US 集羣、一個 Central 集羣和一個 East US 集羣。
然而,HA 組中多集羣的引入也為生產者和消費者服務帶來了不小的複雜性,因為它們必須理解 Kafka 基礎設施的底層拓撲。為解決這一問題,我們構建了生產者和消費者代理(proxy)進程,將基礎設施細節對用户隱藏,所有複雜性都封裝在代理之後。該代理為 Kafka 的生產和消費提供了一個簡單且一致的接口。
例如,當 East US 集羣開始故障時,生產者和消費者端的代理會將流量繞過故障集羣。同樣,我們也可以向 HA 組中添加一個新集羣(例如 South US),而這一切對生產者和消費者服務都是透明的。
關於 Kafka 基礎設施設置的更多細節,請參考我們團隊在本次會議上關於 Kafka 遷移以及 OpenAI 如何簡化 Kafka 消費的演講。
為何需要流處理?
隨着 Kafka 使用量的增加,我們自然開始思考:流處理(stream processing)或 Apache Flink 能帶來什麼?
場景一:數據飛輪(Data Flywheel)
從高層次看,數據飛輪是一個自強化系統,其中數據生成、模型改進和產品使用不斷相互促進,以推動性能和價值的提升。我們發現,更快地將產品使用數據反饋給大模型,實際上能帶來有意義的差異。流處理技術可以通過提供一個可擴展的框架,在 OpenAI 的規模上近乎實時地處理和轉換數據,從而幫助實現數據飛輪的目標。
場景二:實驗數據處理與攝入
在當今的 AI 開發中,快速實驗和迭代對模型開發至關重要。能夠快速處理、關聯並可視化實驗結果,對於加速模型開發非常重要。在流處理出現之前,工程師和研究人員有時會為處理大量實驗數據而構建自定義的臨時系統。這些系統通常涉及複雜的關聯或狀態管理,並且由於在大規模運行系統時的挑戰,也容易出現數據新鮮度問題。這正是 Apache Flink 等流處理技術可以大放異彩的地方——它為預處理實驗數據提供了一個穩健且可擴展的基礎。
除此之外,還有其他業務使用場景,我們稍後會詳細介紹。
構建流處理平台的挑戰
接下來,我將談談我們在 OpenAI 構建流處理平台時遇到的一些挑戰。
挑戰一:Python
雖然大多數開源流處理技術都是基於 JVM 的,但在 AI 領域,Python 是事實上的標準語言。在 OpenAI,許多業務處理邏輯和服務都是用 Python 編寫的,幾乎沒有 Java 支持。儘管 Apache Flink 提供了 Python 支持,但總體而言,其開發和採用相對較新,與 Java 版本相比也還不夠成熟。
挑戰二:雲廠商的限制
我們經常發現,雲廠商宣傳的 Kubernetes 集羣最大規模往往過於樂觀——在實際生產中,受限於控制平面的性能瓶頸,我們很難穩定運行接近該上限的集羣。此外,在實踐中,由於某些區域的物理限制,我們很難從這些區域獲得足夠的容量。為了滿足運行流處理工作負載的容量需求,我們從一開始就不得不在多個 Kubernetes 集羣之上構建我們的平台。而且,正如之前提到的,在雲環境中,集羣和區域都可能失效,因此我們的平台也必須能夠跨區域可靠運行。
挑戰三:高可用 Kafka 集羣帶來的複雜性
最後但同樣重要的是,我們之前提到的 HA Kafka 集羣設置,也為運行 Apache Flink 等框架帶來了挑戰。在 Kafka HA 組設置中讀取一個主題(Topic),實際上會轉化為並行地從多個物理 Kafka 集羣進行多次消費,如果實現不當,反而可能導致可用性降低。
平台架構概覽
在設計流處理平台時,我們始終牢記上述挑戰。以下是我們的整體架構概覽。
首先,我們決定使用 PyFlink 作為主要的流處理框架,並與 Flink 社區合作,持續改進 PyFlink。這使我們的所有用户都能利用 Apache Flink 提供的流處理技術,同時還能複用所有現有的 Python 庫來構建他們的流處理管道。事實證明,使用 Python 也幫助提高了我們用户的開發速度和生產力。
其次,在每個 Flink Kubernetes 集羣內部,我們使用開源的 Flink Kubernetes Operator 來管理 Flink 作業。我們在跨區域的活躍 Flink 集羣之上構建了一個控制平面(control plane)抽象層。這使我們能夠通過單一的控制平面集中管理所有 Flink 作業。
最後,我們還將 Flink 與 OpenAI 的 Kafka 生態系統進行了深度集成,以確保 Flink 能夠與我們上面討論的 Kafka HA 設置可靠地協同工作。
平台架構細節
從宏觀角度看,用户和其他平台(例如機器學習平台)通過控制平面抽象層與流處理平台交互。這裏的控制平面旨在為管理所有流處理管道提供一個統一的入口。
為了讓 Flink 對我們的工程師更易用,我們將其與現有的服務腳手架、測試、構建和部署基礎設施進行了深度集成,使用户可以遵循與微服務開發相同的工作流程。控制平面將負責跨不同區域的多個 Kubernetes 集羣協調作業管理。
在每個 Kubernetes 集羣內部,我們使用開源的 Flink Kubernetes Operator 來編排 Flink 作業。該 Operator 為 Kubernetes 集羣內的管道提供生命週期管理。我們將每個 Flink 作業作為 Flink Deployment 自定義資源運行。Flink 部署通過 Kubernetes 命名空間在不同團隊和組織之間進行隔離。我們為每個命名空間運行一個專用的 Flink Kubernetes Operator。
雖然 Flink Kubernetes Operator 處理了 Flink 的大部分管道生命週期管理,但為了滿足 OpenAI 的特定需求,我們還設置了一個跨集羣的看門狗(watchdog)服務,用於監控 Flink 作業所依賴的 OpenAI 特定配置變更。例如,看門狗服務會定期檢查每個 Flink 作業的主題的 Kafka 拓撲。如果我們發現有新的物理集羣被添加或移除,看門狗將觸發 Flink 作業的重啓,以便它能獲取最新的 Kafka 拓撲變更,從而避免數據丟失或延遲。
對於有狀態的管道,我們使用本地 RocksDB 來存儲狀態,併為每個命名空間設置 Azure Blob Storage 賬户,併為該賬户啓用異地複製(geo-replication)。在主區域發生故障時,我們可以初步故障轉移到輔助區域。目前,平台為所有團隊管理 Azure Blob Storage 賬户,但我們也允許用户選擇提供自己的 Blob 存儲賬户。
在構建過程中,我們遇到了一個需要注意的問題:目前開源的 Apache Flink 實際上並不支持 Azure Workload Identity 身份驗證,而這是 Azure 推薦的用於安全訪問存儲賬户的方式。為了解決這個問題,我們內部將 hadoop-azure 庫升級到了 3.4.1,以啓用 Azure Workload Identity 身份驗證。我們也計劃將此貢獻回社區。
深入 PyFlink
現在,讓我們深入探討幾個關鍵話題。
首先,我們來看看 Python。開源的 PyFlink 提供了 DataStream API 和 Table/SQL API。在 OpenAI 內部,我們將 PyFlink 與我們的單體倉庫(monorepo)系統集成,使用户可以像開發常規 Python 項目一樣,複用所有現有的 Python 庫。
PyFlink 使用了大部分 Flink JVM 棧,並在 Flink SDK 和運行時中增加了對運行 Python 函數的支持。在 SDK 側,它基本上使用 Py4J 將新的 Python DataStream 和 Table/SQL API 映射到 Java 版本。在運行時側,Python 函數被映射到 Java 圖中的自定義 Python 算子。該 Python 算子由運行用户 Python 邏輯的 Python Worker 以及與 Python Worker 通信的自定義 Java 算子組成,後者負責處理檢查點(checkpointing)、水印(watermarking)以及與 Python Worker 的數據和狀態交換。
PyFlink 目前支持兩種不同的執行模式來運行 Python 用户自定義函數:進程模式(process mode)和線程模式(thread mode)。默認模式是進程模式。在進程模式下,用户的 Python 函數作為單獨的進程運行,並使用 Apache Beam 的可移植性框架與 JVM 算子通信。它具有良好的資源隔離性,總體上也更成熟。然而,其侷限性在於 IPC 開銷,因為它們使用 gRPC 在 JVM 進程和 Python 進程之間進行通信。這會帶來序列化和反序列化的開銷。此外,這也需要更多的調優參數來適應不同類型的工作負載,例如批處理大小(batch size)和批處理超時(batch timeout)。
PyFlink 也支持線程模式。在線程模式下,用户的 Python 函數在與 JVM 線程相同的進程中運行。它帶來了吞吐量、延遲的提升以及更短的檢查點時間。然而,其侷限性在於目前僅支持 CPython 和應用模式(application mode),總體上不如進程模式成熟。我們實際上與社區委員會合作,修復了線程模式中的幾個問題,包括日誌記錄以及 JVM 中的共享對象加載。
到目前為止,我們已經在 OpenAI 將 PyFlink 投入生產。然而,我們也觀察到了一些挑戰,首先是效率問題。基本上,正如我們所見,所有的 Python 函數(用户邏輯)都在 Python 中運行,並且在進程模式下 IPC 期間會產生額外的序列化和反序列化成本。因此,對於大規模作業,我們也支持用户用 Java 實現他們的處理函數。PyFlink 實際上支持從 Python DataStream API 調用它們,因此我們可以支持用 Python 編排流處理邏輯,但實際代碼將在 JVM 中運行。
此外,異步 I/O(async I/O)和流式關聯(streaming join)在 Python 的 DataStream API 中尚未得到支持。我們計劃與社區合作,增加這些支持。最後,PyFlink 目前還不支持 Python 3.12,我們也在內部和社區中努力增加這一支持。
Flink 與 Kafka 的集成
接下來我們聊聊 Kafka 連接器——需要特別説明的是,這裏指的是 Flink 自帶的原生連接器,而非 Confluent 提供的版本。如前所述,我們的 Kafka 部署採用了高可用組(HA Group)架構:多個跨區域的物理集羣組成一個邏輯集羣,目的是確保即使其中一個集羣完全失效,整個系統仍能正常運行。這就帶來了一個核心挑戰:如何讓 Flink 應用適配這種多集羣架構?我們能否構建出真正容忍 Kafka 集羣中斷的流處理作業?
乍看之下,這似乎並不難。我們只需要在 Source 和 Sink 兩端“堆”幾個 Kafka 連接器就行了。先看 Source 端:拿到集羣列表後,為每個集羣創建一個 Kafka Source,再把它們 union 起來接入主處理邏輯——問題不就解決了嗎?
可惜,現實沒那麼美好。
你可能已經猜到了結果——否則我也不會花時間專門講這一段。事實上,當某個 Kafka 集羣宕機後不久,我們的 Flink 應用就開始頻繁崩潰。深入排查後,我們發現根本原因在於連接器的初始化機制:當 Kafka Source 啓動時,它會嘗試獲取主題的分區元數據、枚舉所有分區,併為每個分區創建讀取任務。如果此時某個集羣不可達,Flink 內部對 Kafka 元數據的請求就會失敗,並直接拋出異常——整個作業因此被拉垮,毫無容錯能力可言。
通過對 Kafka Source 進行定製化改造,我們將其配置為在元數據請求失敗時無限重試。這樣一來,那些已經成功初始化、正在從 Kafka 分區讀取數據的任務會繼續正常運行;而針對故障集羣的讀取任務則會持續重試,雖不產出數據,但也不會導致作業崩潰。這已經滿足了我們的首要目標:保證應用持續運行。
更進一步,即使在某個 Kafka 集羣宕機的情況下,我們也能正常重啓 Flink 作業——它會跳過不可達集羣的分區,僅對可用集羣創建讀取器。這讓我們離理想狀態又近了一步。
但這還不是全部。我們的 Flink 應用在啓動時會動態加載集羣配置,並據此構建 Kafka Source 列表。正如前文提到的,由於作業拓撲依賴於這份配置,一旦配置變更(例如移除或新增集羣),就必須重啓作業才能生效。為此,我們引入了一個“看門狗”(watchdog)服務,持續監聽配置變化,並在必要時觸發作業重啓。
這意味着,當某個集羣徹底失聯、確認無法恢復時,我們可以直接從應用配置中將其移除,徹底停止對其消費;而當它恢復後,只需重新加入配置,配合自定義的偏移量初始器(initializers),就能從上次中斷的位置繼續消費(大致如此)。通過這種方式,我們實際上擁有了兩層容錯機制:運行時重試 + 配置驅動的動態調整。
需要特別説明的是,這套方案目前主要應用於不依賴水印(watermark)的管道。如果管道使用了水印,而某個 Source 因集羣宕機停止輸出數據,就會導致水印無法推進,整個流處理邏輯被卡住。理論上可以通過設置空閒超時(idle timeout)來緩解,但這一路徑我們尚未像無水印場景那樣充分驗證。
當然,一旦故障集羣恢復,積壓的數據會重新流入系統——不過會被標記為遲到事件(late events)。從設計角度看,這其實提供了一種明確的權衡選擇:用户可以在強一致性視圖和高可用性之間做出取捨。選擇後者,就意味着接受更多遲到數據,但換來的是系統在部分基礎設施失效時仍能持續運轉的能力。
關於 Source 端,還有一點值得補充:Flink 開源社區其實已經提供了一個非常實用的方案——動態 Kafka Source(Dynamic Kafka Source)。它將多個主題或多個集羣的 Kafka 源統一抽象為一個真正的 Flink Source。你可以實現一個輕量級的元數據服務(或一個簡單的類),動態指定當前需要消費哪些集羣和主題。更重要的是,它支持在運行時重新加載元數據,並直接將新發現的分區分配給任務,完全無需修改作業拓撲或重啓作業。
這意味着,你不再需要依賴外部的“看門狗”來觸發重啓——配置變更可以實時生效,靈活性大幅提升。而且,這個功能是完全開源的。它的任務分配邏輯與原生 Kafka Source 高度一致:分配的最小單元不是單純的“分區”,而是 (分區 × 主題 × 集羣) 的組合,能準確反映多集羣拓撲結構。
我們目前尚未在生產中採用它,主要有兩個原因:一是 PyFlink 尚未提供對應的 Python 封裝(wrapper),二是它在某些細節上仍有侷限——例如,無法為每個集羣和主題單獨配置偏移量初始器(offset initializers)。
儘管如此,它的整體表現已經相當可靠,遠勝於手動重啓作業並喊一句“嘿,醒醒,加個集羣!”。因此,我們計劃儘快為 PyFlink 補齊對動態 Kafka Source 的支持,並在落地過程中持續修復和優化,充分釋放其潛力。
有了這個方案,我們終於有了一條清晰可行的路徑:即使某個 Kafka 集羣宕機,Flink 作業也能無縫繼續消費其他集羣的數據,用户幾乎感知不到任何中斷。
現在來看 Sink 端。我們最初設想了一個看似簡潔的方案:通過一個分流函數,將主數據流均勻拆分到多個旁路輸出(side outputs)中,再為每個 Kafka 集羣分別掛載一個 Sink。理想情況下,三個集羣各承擔約三分之一的流量——邏輯清晰,實現簡單,當時我們甚至覺得這方案穩了……
可惜,現實很快給了我們一記重擊。你大概已經猜到結果了:一旦某個 Kafka 集羣下線,整個作業幾乎立刻陷入停滯,無法繼續處理數據。
更深層次的問題在於,這種設計存在結構性缺陷。由於所有 Sink 共享同一個處理流水線,任何一個集羣出現性能波動(比如網絡延遲、磁盤瓶頸或限流),都會通過背壓(backpressure)傳導至整個作業,拖慢所有數據流。我們原本希望通過靜態均分來實現負載均衡,卻忽略了現實環境中的異構性——不同區域的 Flink 作業與 Kafka 集羣之間的寫入能力差異極大:有的集羣吞吐迅猛,有的則響應遲緩,固定比例的分流非但無法均衡負載,反而放大了系統短板。
那我們怎麼辦?坦白説,我們“走了一條捷徑”。事實上,OpenAI 的大多數服務早已在使用一個統一的生產者代理,它封裝了重試、斷路器(circuit breakers)、集羣動態發現等高可用邏輯。於是,我們直接把這個代理調用包裝成一個 Flink 函數來實現 Sink 功能。當然,這樣做也帶來了一些限制和注意事項。
首先,我們的代理並不真正支持基於鍵的自定義分區(key-based custom partitioning)。而如果強制使用鍵分區,就會把某個分區的數據綁定到單一 Kafka 集羣上——這與我們追求高可用的目標直接衝突。畢竟,我們不想讓任何一個 Kafka 集羣成為系統的單點依賴。 其次,事務(transactions)目前也無法支持。要在代理中集成完整的狀態管理與事務語義,複雜度極高,幾乎得不償失。需要特別説明的是,由於我們將代理封裝成了一個普通的 Flink 函數,天然受限於函數的執行模型——事務性寫入無法在單個函數內實現,必須通過完整的 Sink 實現才能支持。 最後,這種方案還引入了額外的性能開銷,尤其是在高吞吐場景下,代理層的調用鏈路會成為瓶頸,這顯然不是理想狀態。 那麼,下一步怎麼優化?我們正在規劃將這一能力重構為一個真正的開源 Sink 實現,並計劃向社區提交正式提案,打造一個既高可用又高性能的標準解決方案。對於非事務性寫入,實現起來其實相對直接:你可以替換底層的寫入器(writer)實現,讓它將數據分發給多個 Kafka 集羣的寫入器,從而構建一個寫入器池。在這個池子之上,你可以封裝重試、故障轉移、負載均衡等邏輯。更重要的是,這套機制可以像“動態 Kafka 源”那樣支持運行時動態配置——集羣的增刪、遷移等操作都能實時生效,無需重啓作業,極大提升了運維靈活性。 相比之下,事務性寫入的實現要複雜得多。雖然當前的 Kafka Sink 內部確實有一個生產者池(pool of producers),理論上可以嘗試將其擴展到多集羣場景,但一旦引入跨集羣事務,就會迅速陷入各種邊緣情況的泥潭:事務邊界如何界定?故障時如何回滾?不同集羣間的協調如何保證?這些問題目前都沒有成熟的解決方案。儘管如此,我們仍認為存在一條可行的技術路徑——通過更精細的控制和設計,未來有望實現一種既支持高可用又兼顧遷移靈活性的事務性發布方案,遠優於當前依賴靜態配置的替代方案。 結合 Source 和 Sink 兩端的實踐,你應該已經能感受到:在真實的大規模生產環境中,面對多集羣、跨區域的 Kafka 架構,流處理系統必須在可用性、一致性與運維效率之間做出務實權衡。而我們的探索,正是為了在這條複雜路徑上找到更穩健的前行方式。
讓我們稍作停頓,重新聚焦一下:為什麼這些設計對我們如此關鍵?
因為在實際運行中,故障不是“會不會發生”的問題,而是“何時發生”的問題。中斷早已成為常態,而非例外。你可能經歷過,也可能沒經歷過——但在 OpenAI,我們確實遇到過諸如整個區域宕機、光纜被挖斷導致跨區域延遲驟增等極端情況。有時問題甚至源於我們自己的操作失誤。但無論原因如何,系統都必須能扛得住。
正因如此,我們必須從底層基礎設施到上層應用,全棧考慮容錯能力,確保流處理作業在任何異常情況下都能持續穩定運行。
回顧一下我們前面提到的關鍵挑戰:
- Flink 集羣可能失效 → 我們需要能在多個 Kubernetes 集羣間自由遷移作業;
- 存儲可能丟失 → 我們依賴異地複製的存儲,並支持主備切換;
- Kafka 集羣可能中斷 → 我們通過高可用組和代理層,確保任一 Kafka 集羣都不是單點故障,無論是在消費端還是生產端。
當然,脱離實際場景談架構意義有限。接下來,我們通過幾個真實受益於這套設計的業務管道,來看看這些能力是如何落地的。
用例一:實時 Embedding 生成
我們有一類典型的管道,用於為各類產品實時生成 Embedding。其邏輯非常直接:接收輸入數據,調用模型服務(RPC),再將結果輸出。這類任務之所以選擇 Flink,一方面是因為其 API 簡潔易用,但更關鍵的原因在於——我們需要將結果同步分發到多個下游區域,而這些區域各自託管着對應的數據副本。
在這個場景中,數據的新鮮度遠比“截至某個水印的完整視圖”更重要。因此,系統必須具備容忍單個 Kafka 集羣故障的能力:即使某個區域的集羣宕機,仍能繼續消費其他活躍區域的數據流。這類管道通常不依賴水印,也不需要對遲到數據做特殊處理;我們只期望當故障集羣恢復後,積壓的數據能自然流入並被正常處理——就像什麼都沒發生過一樣。
用例二:傳統 ML 特徵計算
另一個典型場景是傳統機器學習特徵的實時計算——畢竟,一場不提 AI 的 OpenAI 技術分享多少有點説不過去。
藉助像開源項目 Chronon 這樣的框架,我們可以用聲明式方式定義特徵邏輯(例如 “統計用户過去 1 小時內點擊按鈕的次數”),然後由系統自動編譯成 Flink 作業執行。這類管道同樣遵循 OpenAI 內部廣泛採用的 “一次計算,到處分發”(compute once, distribute everywhere) 範式。原因很實際:原始數據往往來自多個區域,而下游應用可能部署在本地,或需要在多個區域冗餘存儲同一份特徵數據(即便某些區域使用頻率較低)。
特別值得注意的是,輸入數據本身也是跨區域分佈的。這進一步強化了我們的架構要求:Kafka 集羣絕不能成為任一區域的單點故障——否則,特徵計算的完整性將直接受到威脅。
未來工作
在結束本次分享之前,我們想簡要談談未來的演進方向。這一路走來,我們踩過不少坑,也向社區提交了一系列 issue 和 PR。其中不少集中在 PyFlink 上——比如相比 Java 版本仍存在的功能缺失或穩定性問題;還有一些則涉及更細粒度的部署定製和雲環境特有的挑戰。
但值得慶幸的是,整個生態是開源的,任何人都可以參與共建。雖然 PyFlink 的成熟度尚不及 Java,但社區響應迅速、協作氛圍良好。對我們而言,它絕非“洪水猛獸”,而是一個值得投入的方向——尤其當你的用户羣體主要是 Python 工程師時,讓他們直接用熟悉的語言開發流處理作業,遠比説服他們轉用 Scala 或 Java 來得高效。
除了持續回饋開源社區,我們也在思考如何提升平台自身的自動化能力。目前,控制平面主要依賴部署系統調用 CLI 工具進行管理。未來,我們希望構建一個統一的 Flink 應用管理平台,能夠智能決策諸如:
- “這個作業該調度到哪個 Kubernetes 集羣?是基於負載、資源位置,還是容災策略?”
- “何時自動擴縮容底層集羣?”
- “當某個集羣異常時,能否自動觸發跨集羣故障轉移,而不是半夜把工程師叫起來手動遷移?”
歸根結底,我們的目標是讓平台真正“好用”。為此,我們正在探索一些關鍵體驗優化,例如:
- 自助式 SQL 管道:工程師打開一個 Notebook,就能像寫查詢一樣快速構建流處理邏輯;
- 完整的 PyFlink 功能支持:確保異步 I/O、流式 Join、Python 3.12 兼容等能力盡快落地;
- 端到端可靠性提升:包括零停機部署、動態連接器更新等。
最終,我們希望用户只需關注業務邏輯本身——無論是寫一段 SQL 還是一個 Python 函數——而無需操心“作業怎麼跑、狀態怎麼存、集羣掛了怎麼辦”。平台應該默默扛下所有複雜性,讓開發者專注創造價值。
以上就是我們今天的全部內容,感謝大家的聆聽!現在進入問答環節,歡迎提問。
Q&A 環節
問:Flink 應用程序的軟件生命週期是怎樣的?
在批處理或數據倉庫場景中,我們可以反覆重跑全量數據、調整查詢、驗證結果,直到輸出正確為止。但在你們的 Flink 流處理架構下,這個過程是如何實現的?比如,當我要上線一個包含新字段的新版本作業時,是否需要重放 TB 級的歷史數據?
答:
目前在 OpenAI,PyFlink 作業默認啓動時會從 Kafka 中最早的可用偏移量(即保留窗口起點)開始消費。我們為 Kafka 主題默認保留 7 天的數據,因此大多數情況下可以通過重放這 7 天的數據來驗證或更新作業邏輯。
如果作業邏輯發生變更,同時 Kafka 主題的 Schema 也發生了變化(例如新增字段),我們當前主要通過回填(backfill) 的方式支持數據重處理——即重新消費 Kafka 中的歷史數據並應用新邏輯。不過,目前我們的回填能力僅限於從 Kafka 本身讀取,尚不支持從數據湖等其他存儲源進行重新消費。
至於自動化程度:當你部署新版本作業時,系統還不會自動觸發全量回放。是否重放、從哪裏開始重放,目前仍需用户手動配置。這確實是平台的一個待完善點。
需要説明的是,我們的 Flink 平台投入生產的時間並不長——大約從去年年底才開始規模化使用。因此,在作業生命週期管理、自動化回填、Schema 演進支持等方面,我們還有大量工作要做,也歡迎社區和用户一起推動這些能力的落地。
問:你們向用户暴露的編寫 Python 作業的接口是什麼? 你們是否集成了 Jupyter Notebook,或者圍繞 Jupyter Notebook 做了 CI/CD?
答: 我們向用户暴露的接口與我們內部工程師開發 Python 項目和 Python 微服務的方式相同。他們不會去寫一個 FastAPI 應用,而是會使用 PyFlink API 編寫一個 main.py。這是相當標準的 IDE——VS Code、IntelliJ——IDE 體驗與編寫微服務時完全相同,只是使用的不是 FastAPI 框架,而是 PyFlink 框架。因此,你可以使用所有工程師在開發微服務時使用的 Python 庫。
人們測試或開發的一些方式通常是使用開發環境中的一些共享或可用的測試集羣。所以你能夠相當快速地進行推送——我認為只需幾分鐘就能運行一個針對測試環境的部署命令,它就會帶着你的本地代碼在那裏運行。
問:你們是否也提供某種能力,對存儲在數據湖倉中的數據進行重處理——比如用文件源或 Sink 替換 Kafka?
答: 我們目前沒有這個功能,但這在你想處理更復雜的事情時基本上是必要的——比如如果你需要比 Kafka 中或你的保留期更多的歷史數據。也許你出於各種原因對保留期做了相當激進的削減,那麼這就會變得更加具有挑戰性——這基本上是必要的,但我們目前還沒有這樣做。
問:你們提到的集羣和處理之間的代理,是開源的還是內部專有的?
答: 它是內部的。它基本上是圍繞標準 Java Kafka 客户端的一個常規腳手架,包含了我們想要和需要的所有邏輯,比如故障轉移、斷路器、身份驗證、重試邏輯等。你不想為了添加一個新的 Kafka 集羣或修復一個問題而去部署一百個服務——你只需部署一個代理。所以它實際上是一個輕量級的 gRPC 服務。
補充一點:生產者代理是內部的,正如前文所提到的。但對於消費者代理,我們實際上使用了 Uber 的 uForward 代理,這是開源的。我們的隊友在另一個演講中介紹了我們如何利用消費者端代理來簡化 OpenAI 的 Kafka 消費。
問:你們談到的業務用例是假設性的還是真實的?例如,Embedding 生成是否真的在使用這個管道?如果它們是真實的,那麼 Flink 替代了什麼?你們在早期技術中遇到了什麼問題,以至於需要使用 Flink?
答: 這些用例大多是真實生產場景的簡化版,而且基本都屬於全新構建的項目。我們並沒有用 Flink Pipeline 去替換任何已有的系統,而是當團隊在解決新問題時,往往會先嚐試一些臨時甚至“有點瘋狂”的方案——比如在數據庫裏手動排隊、寫腳本輪詢等。這時我們就會介入,建議他們:“不如試試用一個 Kafka 主題加一個 Flink 應用?這樣可能比手搓一套更簡單、也更可靠。” 因此,這些 Flink 應用通常服務於全新的功能或能力,而非對舊系統的遷移。
以 Embedding 實時生成為例,在實際落地過程中,我們就遇到了一個具體挑戰:PyFlink 目前缺乏對異步 I/O 的原生支持。為了解決這個問題,我們不得不自行實現繞過方案。但與此同時,我們也正積極與 Flink 社區合作,推動將異步 I/O 能力正式集成到 PyFlink 中,以便更好地支撐這類高併發、低延遲的典型流處理場景。
問:對於高可用性,對於一個特定的主題,你們是將主題存儲在多個集羣或區域中,還是隻在一個特定區域中?
答: 是的,每個主題的數據實際上會分佈在多個區域的 Kafka 集羣中。你可以把它理解為一種邏輯上的分片(sharding):生產時,我們會根據隨機策略或優先級規則,將數據寫入其中一個集羣。如果某個集羣宕機,系統會自動嘗試其他可用集羣;如果斷路器已經觸發,甚至根本不會考慮那個故障節點。因此,數據天然就是跨集羣分佈的。
這意味着,任何 Kafka 消費者——無論是否基於 Flink——都必須從多個區域同時讀取數據,才能獲得完整的視圖。 沒錯,這正是我們構建生產者和消費者代理的核心原因:把多集羣拓撲這類基礎設施複雜性完全封裝起來,對上層應用透明。用户只需像使用單個 Kafka 集羣一樣進行生產和消費,無需關心底層到底有幾個集羣、哪個可用、如何路由。
這些代理是獨立服務嗎? 是的,我們將其設計為獨立的中間服務。這樣做的好處顯而易見:如果重試邏輯、集羣列表或故障轉移策略分散在上百個業務服務裏,作為平台團隊,每次調整配置或修復問題都將是一場災難。而通過集中式的代理服務,我們可以快速迭代、統一治理,所有客户端只需通過它通信即可——既降低了用户的心智負擔,也極大提升了平台的可維護性。
問: PyFlink 相比基於 Java/JVM 的 Flink 應用肯定有額外開銷。你們有沒有具體的性能數據,能説明使用 Python 到底會帶來多少資源損耗?
答: 我們目前還沒有進行正式的基準測試,但從實際運行情況來看,由於用户邏輯是在 Python 進程中執行的,PyFlink 作業在 CPU 和內存資源消耗上確實明顯高於純 JVM 實現。
正因如此,對於真正大規模、高吞吐的作業,我們提供了一種混合開發模式:用 Python 編排作業拓撲(比如定義 Source、Sink 和算子連接關係),但將核心計算邏輯實現在 JVM 算子中。這樣既能保留 Python 在開發效率和生態集成上的優勢,又能確保關鍵路徑的性能和資源效率。
事實上,已經有多個團隊在實踐中採用了這種策略。當他們遇到作業性能瓶頸或內存不足(OOM)問題時,第一反應往往是:“能不能把這段計算密集型邏輯移到 Java 算子裏?”——而通常這樣做並不複雜,只需少量重構,就能顯著改善資源使用和穩定性。
問: 從 Kafka 的角度看,你們如何應對流量高峯——比如 GPT 圖像生成功能上線時的突發流量?Kafka 是如何擴展以應對這類場景的?
答: 實際上,對我們而言,最大的流量增長往往來自新功能或新用例的接入,而非產品發佈本身。雖然像 GPT 圖像生成上線這樣的事件確實會帶來流量激增,但這類情況通常比外界想象的更容易提前規劃。
在 OpenAI,團隊在設計階段就會主動與我們溝通:“我們打算做這個功能,計劃用 Kafka。” 這時我們會深入討論關鍵問題:消息量級是多少?是否需要按用户或消息鍵保序?數據保留週期多長?吞吐和延遲要求如何?基於這些信息,我們就能提前做好容量評估、集羣擴容和分區規劃。
正因為有這種前置協作機制,大多數高流量場景都能被平穩承接,實際運行中並沒有出現“措手不及”的情況。