博客 / 詳情

返回

Apache Flink 2.2.0: 推動實時數據與人工智能融合,賦能AI時代的流處理

Apache Flink PMC 很高興地宣佈 Apache Flink 2.2.0 版本發佈了。Flink 2.2.0 版本進一步增強了 AI 函數 和 向量檢索功能,改進了物化表和連接器框架,並優化了批處理和 PyFlink 支持。Flink 2.2.0 版本總共由來自全球的 73 位貢獻者參與,累計推進了 9 個 FLIP(Flink 重要改進提案),完成了 220 多項缺陷修復和改進。

Flink 2.2.0 版本無縫集成實時數據處理與人工智能,開啓了人工智能時代。該版本增強了用於大規模語言模型推理的 ML_PREDICT 和用於實時向量搜索的 VECTOR_SEARCH,從而增強了流式人工智能應用的能力。重點功能包括:物化表增強、Delta Join優化、均衡任務調度和更多連接器優化(包括限流框架和均勻分片),顯著提升了處理性能、可擴展性和可靠性,為構建智能、低延遲的數據管道奠定了堅實的基礎。我們衷心感謝所有貢獻者的寶貴支持!

接下來讓我們深入瞭解Flink 2.2.0版本的重點內容。

Flink SQL 改進

實時AI函數

從 Flink 2.1 版本起,Apache Flink 通過 Flink SQL 中的 ML_PREDICT 函數支持使用 LLM 功能,用户能夠以簡單高效的方式執行語義分析。在 Flink 2.2.0 版本中,Table API 支持了模型推理操作,允許將機器學習模型直接集成到數據處理中,並使用特定提供商(例如 OpenAI)的模型對數據進行預測處理。

使用示例:

  • 創建並使用模型
// 1. Set up the local environment
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 2. Create a source table from in-memory data
Table myTable = tEnv.fromValues(
    ROW(FIELD("text", STRING())),
    row("Hello"),
    row("Machine Learning"),
    row("Good morning")
);

// 3. Create model
tEnv.createModel(
    "my_model",
    ModelDescriptor.forProvider("openai")
        .inputSchema(Schema.newBuilder().column("input", STRING()).build())
        .outputSchema(Schema.newBuilder().column("output", STRING()).build())
        .option("endpoint", "https://api.openai.com/v1/chat/completions")
        .option("model", "gpt-4.1")
        .option("system-prompt", "translate to chinese")
        .option("api-key", "<your-openai-api-key-here>")
        .build()
);

Model model = tEnv.fromModel("my_model");

// 4. Use the model to make predictions
Table predictResult = model.predict(myTable, ColumnList.of("text"));

// 5. Async prediction example
Table asyncPredictResult = model.predict(
    myTable, 
    ColumnList.of("text"), 
    Map.of("async", "true")
);

更多信息

  • FLINK-38104
  • FLIP-526

向量搜索

Apache Flink 通過 ML_PREDICT 函數和大模型進行了無縫銜接,已在情感分析、實時問答系統等場景中得到技術驗證。然而目前的架構僅允許 Flink 使用嵌入模型將非結構化文本數據轉換為高維向量特徵,然後將這些特徵持久化到下游存儲系統,缺乏對向量空間進行實時在線查詢和相似性分析的能力。

Flink 2.2.0 提供了 VECTOR_SEARCH 函數,使用户能夠直接在 Flink 中執行流式向量相似性搜索和實時上下文檢索。

以下SQL語句為例:

-- Basic usage
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10
);

-- With configuration options
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10,
  MAP['async', 'true', 'timeout', '100s']
);

-- Using named parameters
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  SEARCH_TABLE => TABLE vector_table,
  COLUMN_TO_QUERY => input_table.vector_column,
  COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
  TOP_K => 10,
  CONFIG => MAP['async', 'true', 'timeout', '100s']
);

-- Searching with contant value
SELECT * 
FROM VECTOR_SEARCH(
  TABLE vector_table,
  ARRAY[10, 20],
  DESCRIPTOR(index_column),
  10,
);

更多信息

  • FLINK-38422
  • FLIP-540
  • Vector Search

物化表

物化表是 Flink SQL 中引入的一種新的表類型,用於簡化批處理和流式數據管道,提供一致的開發體驗。創建物化表時,只需指定數據新鮮度和查詢條件,引擎即可自動生成物化表的模式,並創建相應的數據管道以保證指定的數據新鮮度。

從 Flink 2.2.0 開始,FRESHNESS 不再是 CREATE MATERIALIZED TABLE 和 CREATE OR ALTER MATERIALIZED TABLE DDL 語句的必要組成部分。Flink 2.2.0 引入了一個新的 MaterializedTableEnricher 接口,該接口為自定義的默認邏輯提供了一個正式的擴展方式,允許高級用户實現“智能”的默認行為(例如,從上游表推斷數據新鮮度)。

此外,用户可以使用 DISTRIBUTED BYDISTRIBUTED INTO 來支持物化表的分桶。用户還可以使用 SHOW MATERIALIZED TABLES 來展示所有物化表。

使用方式如下:

CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    DISTRIBUTED INTO 5 BUCKETS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT
        ds
    FROM
     ...

更多信息

  • FLINK-38532
  • FLINK-38311
  • FLIP-542
  • FLIP-551

SinkUpsertMaterializer V2

SinkUpsertMaterializer是 Flink 中的一個算子,在亂序的變更事件發送到 upsert 接收器之前對其進行協調。在某些情況下,這個算子的性能會呈指數級下降。Flink 2.2.0 引入了一種針對此類情況優化的新版本實現。

更多信息

  • FLINK-38459
  • FLIP-544

Delta Join

Apache Flink 2.1 版本引入了新的Delta Join算子,以緩解regular join中由於龐大狀態帶來的問題。通過雙向查找 join 取代了regular join維護的大量狀態,直接重用源表中的數據。

Flink 2.2.0 增加了對更多 SQL 模式轉換為Delta Join的支持。Delta Join現在支持在不使用 DELETE 操作的情況下應用 CDC 數據源,並允許在數據源之後進行投影和過濾操作。此外,Delta Join還支持緩存,這有助於減少對外部存儲的請求。

目前,Apache Fluss (Incubating) 源表可以用作Delta Join的源表,可以在Fluss相關文檔查看對應表的定義方式和使用案例。

更多信息

  • Delta Joins
  • Delta Join in Fluss

SQL類型

在 Flink 2.2 版本前,SQL 中定義的ROW類型(例如 SELECT CAST(f AS ROW<i NOT NULL>))會忽略 NOT NULL 約束。這雖然更符合 SQL 標準,但在處理嵌套數據時會導致許多類型不一致和晦澀難懂的錯誤信息。例如,這阻止了在計算列或join key中使用ROW類型。Flink 2.2.0 版本修改了該行為,考慮ROW的可空性。配置項 table.legacy-nested-row-nullability 允許在需要開啓來恢復舊行為,建議更新之前忽略約束的已有查詢。

Flink 2.2.0 轉換為 TIME 類型時會考慮正確的精度(0-3),將不正確的字符串轉換為時間類型(例如,小時部分大於 24)現在會導致運行時異常。BINARY 和 VARBINARY 之間的類型轉換現在會正確考慮長度。

更多信息

  • FLINK-20539
  • FLINK-38181

使用 UniqueKeys 進行狀態管理

Flink 2.2 版本對 StreamingMultiJoinOperator 進行了優化和變更,使用 UniqueKeys 而不是 UpsertKeys 來進行狀態管理。該算子在 Flink 2.1 中以實驗狀態發佈,後續會持續進行優化,這些優化可能會導致不兼容的變化。

更多信息

  • FLINK-38209

Runtime

均衡任務調度

Flink 2.2.0 引入了一種均衡的任務調度策略,以實現任務管理器的任務負載均衡並減少作業瓶頸。

更多信息

  • FLINK-31757
  • FLIP-370

增強HistoryServer工作歷史保留策略

在 Flink 2.2.0 版本前,HistoryServer 僅支持基於數量的作業歸檔保留策略,這不足以滿足需要基於時間保留或組合規則的場景。用户在 Flink 2.2.0 可以使用新的配置項 historyserver.archive.retained-ttl 並結合 historyserver.archive.retained-jobs來滿足更多場景需求。

更多信息

  • FLINK-38229
  • FLIP-490

Metrics

自 Flink 2.2.0 版本起,用户可以為作業中使用的每個算子/轉換分配自定義指標變量。這些變量隨後會被指標報告器轉換為標籤,允許用户為特定運算符的指標添加標籤。例如,您可以使用此功能來命名和區分數據源。

用户現在可以通過 traces.checkpoint.span-detail-level 控制檢查點span的詳細級別。最高級別會報告每個任務和子任務的 span 樹。報告的自定義 span 現在可以包含子 span。更多詳情請參閲Traces。

更多信息

  • FLINK-38158
  • FLINK-38353

Connectors

Scan數據源限流功能

Flink 作業頻繁地與外部系統交換數據,這會消耗網絡帶寬和 CPU 資源。當這些資源稀缺時,過於頻繁地拉取數據可能會干擾其他工作負載,例如 Kafka/MySQL CDC 連接器。在 Flink 2.2 中,我們引入了 RateLimiter 接口,為Scan數據源提供請求速率限制,連接器開發人員可以將其與限流框架集成,以實現自己的限流策略。此功能僅在 DataStream API 中可用。

更多信息

  • FLINK-38497
  • FLIP-535

支持均勻分片

SplitEnumerator 負責分配分片工作,但它無法瞭解這些分片的實際運行時狀態或分佈情況。這使得 SplitEnumerator 無法保證分片均勻分佈,並且極易出現數據傾斜。從 Flink 2.2 開始,SplitEnumerator 獲得了分片分佈信息,並提供了在運行時均勻分配分片的能力。例如,此功能可用於解決 Kafka 連接器中的數據傾斜問題。

更多信息

  • FLINK-38564
  • FLIP-537

其他內容

PyFlink

在 Flink 2.2 中,我們為 Python DataStream API 添加了異步函數支持。這使得 Python 用户能夠在 Flink 作業中高效地查詢外部服務,例如通常部署在獨立 GPU 集羣中的 LLM 服務等。

此外,我們還提供了全面的支持,以確保外部服務訪問的穩定性。一方面,我們支持限制發送到外部服務的併發請求數量,以避免服務過載。另一方面,我們也添加了重試機制,以應對可能由網絡抖動或其他瞬態問題導致的臨時服務不可用情況。

以下是一個簡單的使用示例:

from typing import List
from ollama import AsyncClient

from pyflink.common import Types, Time, Row
from pyflink.datastream import (
    StreamExecutionEnvironment,
    AsyncDataStream,
    AsyncFunction,
    RuntimeContext,
    CheckpointingMode,
)


class AsyncLLMRequest(AsyncFunction[Row, str]):

    def __init__(self, host, port):
        self._host = host
        self._port = port
  
    def open(self, runtime_context: RuntimeContext):
        self._client = AsyncClient(host='{}:{}'.format(self._host, self._port))

    async def async_invoke(self, value: Row) -> List[str]:
        message = {"role": "user", "content": value.question}
        question_id = value.id
        ollam_response = await self._client.chat(model="qwen3:4b", messages=[message])
        return [
            f"Question ID {question_id}: response: {ollam_response['message']['content']}"
        ]

    def timeout(self, value: Row) -> List[str]:
        # return a default value in case timeout
        return [f"Timeout for this question: {value.a}"]


def main(output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)
    ds = env.from_collection(
        [
            ("Who are you?", 0),
            ("Tell me a joke", 1),
            ("Tell me the result of comparing 0.8 and 0.11", 2),
        ],
        type_info=Types.ROW_NAMED(["question", "id"], [Types.STRING(), Types.INT()]),
    )

    result_stream = AsyncDataStream.unordered_wait(
        data_stream=ds,
        async_function=AsyncLLMRequest(),
        timeout=Time.seconds(100),
        capacity=1000,
        output_type=Types.STRING(),
    )

    # define the sink
    result_stream.print()

    # submit for execution
    env.execute()


if __name__ == "__main__":
    main(known_args.output)

更多信息

  • FLINK-38190

升級 commons-lang3 依賴到 3.18.0

將 commons-lang3 從 3.12.0 升級到 3.18.0 以解決 CVE-2025-48924。

更多信息

  • FLINK-38193

protobuf-java 從 3.x 升級到 4.32.1

Flink 2.2 從protobuf-java 3.21.7(Protocol Buffers 版本 21)升級到 protobuf-java 4.32.1(對應 Protocol Buffers 版本 32)。此次升級實現了以下功能:

  • Protobuf 版本支持:完全支持 Protocol Buffers v27 及更高版本中引入的 edition = "2023"edition = "2024" 語法。版本提供了一種統一的方法,將 proto2 和 proto3 的功能與細粒度的特性控制相結合。
  • 改進 Proto3 字段存在性檢查:更好地處理 proto3 中的可選字段,不再受限於舊版 protobuf 的限制,無需將 protobuf.read-default-values 設置為 true 來進行字段存在性檢查。
  • 性能提升:利用了 11 個 Protocol Buffers 版本(版本 22-32)中的性能改進和錯誤修復。
  • 現代 Protobuf 特性:可訪問更新的 protobuf 功能,包括 Edition 2024 特性和改進的運行時行為。

使用 proto2 和 proto3 .proto 文件的用户可以兼容使用,無需更改。

更多信息

  • FLINK-38547

升級注意事項

Flink 社區致力於確保版本升級過程儘可能順暢。但某些變更可能需要用户在升級到 2.2 版本時,對程序的某些部分進行調整。請參閲發版説明以獲取升級過程中需要進行的調整和需要檢查的問題完整列表。

貢獻者列表

Apache Flink 社區衷心感謝所有為此次版本發佈作出貢獻的開發者:

Alan Sheinberg, Aleksandr Iushmanov, AlexYinHan, Arvid Heise, CuiYanxiang, David Hotham, David Radley, Dawid Wysakowicz, Dian Fu, Etienne Chauchot, Ferenc Csaky, Gabor Somogyi, Gustavo de Morais, Hang Ruan, Hao Li, Hongshun Wang, Jackeyzhe, Jakub Stejskal, Jiaan Geng, Jinkun Liu, Juntao Zhang, Kaiqi Dong, Khaled Hammouda, Kumar Mallikarjuna, Kunni, Laffery, Mario Petruccelli, Martijn Visser, Mate Czagany, Maximilian Michels, Mika Naylor, Mingliang Liu, Myracle, Naci Simsek, Natea Eshetu Beshada, Niharika Sakuru, Pan Yuepeng, Piotr Nowojski, Poorvank,Ramin Gharib, Roc Marshal, Roman Khachatryan, Ron, Rosa Kang, Rui Fan, Sergey Nuyanzin, Shengkai, Stefan Richter, Stepan Stepanishchev, Swapnil Aher, Timo Walther, Xingcan Cui, Xuyang, Yuepeng Pan, Yunfeng Zhou, Zakelly, Zhanghao Chen, dylanhz, gong-flying, hejufang, lincoln lee, lincoln-lil, mateczagany, morvenhuang, noorall, r-sidd, sxnan, voonhous, xia rui, xiangyu0xf, yangli1206, yunfengzhou-hub, zhou

更多內容


活動推薦

複製下方鏈接或者掃描二維碼
即可快速體驗 “一體化的實時數倉聯合解決方案”
瞭解活動詳情:https://www.aliyun.com/solution/tech-solution/flink-hologres

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.