1、安裝pipeline

docker run -d -p 9099:9099 --add-host=host.docker.internal:host-gateway -v pipelines:/app/pipelines --name pipelines --restart always ghcr.io/open-webui/pipelines:main

2、對接OpenWebUI(密碼:0p3n-w3bu!)

Web API中使用Dependency Resolver_#OpenWebUI

3、進入pipeline容器配置

只需幾個簡單的步驟即可開始使用 Pipelines:

1、確保已安裝 Python 3.11。它是唯一官方支持的 Python 版本
2、克隆管道存儲庫
git clone https://github.com/open-webui/pipelines.git
cd pipelines
3、安裝所需的依賴項
pip install -r requirements.txt
4、啓動 Pipelines 服務器
sh ./start.sh

服務器運行後,將客户端上的 OpenAI URL 設置為 Pipelines URL。這解鎖了 Pipelines 的全部功能,集成了任何 Python 庫並創建了適合您需求的自定義工作流程。

4、上傳dify的python腳本(url,key,輸入輸出變量名都需要更改)

#####塊輸出
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings

# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')

class Pipeline:
    def __init__(self):
        self.name = "IT_AI智能知識平台"  # 平台名稱
        self.api_url = "http://10.160.8.210/v1/workflows/run"  # Dify API 地址
        self.api_key = "app-raqQgGRuLSCKxatmFy8S0JmD"          # Dify API Key
        self.api_request_stream = True                         # 啓用流式響應(但僅取最終結果)
        self.verify_ssl = True                                 # 自部署 Dify 可設為 False
        self.debug = True                                      # 開啓調試日誌(必須保留!)
    
    async def on_startup(self):
        print(f"on_startup: {self.name} 初始化完成")
    
    async def on_shutdown(self): 
        print(f"on_shutdown: {self.name} 已停止")

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if self.debug:
            print(f"inlet: body={body}, user={user}")
        return body

    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if self.debug:
            print(f"outlet: body={body}, user={user}")
        return body

    def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
        print(f"pipe: {self.name} - 開始處理用户消息")
        
        if self.debug:
            print(f"pipe: 用户輸入: {user_message}")
            print(f"pipe: body內容: {body}")

        # 1. 構建請求參數
        response_mode = "streaming" if self.api_request_stream else "blocking"
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        data = {
            "inputs": {"user_message_text": user_message},  # Dify 輸入變量名(與工作流定義一致)
            "response_mode": response_mode,
            "user": body.get("user", {}).get("email", "default_user")
        }

        # 2. 發送請求到 Dify
        try:
            response = requests.post(
                self.api_url,
                headers=headers,
                json=data,
                stream=self.api_request_stream,
                verify=self.verify_ssl
            )

            # 3. 處理響應(核心修正:僅返回最終完整文本)
            if response.status_code == 200:
                full_text = ""
                final_output = False  # 標記是否已獲取最終結果
                for line in response.iter_lines():
                    if not line or final_output:
                        continue  # 空行或已獲取最終結果,跳過
                    line = line.decode('utf-8').strip()
                    if self.debug:
                        print(f"pipe: Dify 原始響應行: {line}")

                    # 解析流式響應(僅處理 workflow_finished 事件)
                    if line.startswith("data: "):
                        json_str = line[len("data: "):]
                        try:
                            json_data = json.loads(json_str)
                            # 關鍵:僅處理最終完成事件(event: workflow_finished)
                            if json_data.get("event") == "workflow_finished":
                                outputs = json_data.get("data", {}).get("outputs", {})
                                full_text = outputs.get("summary", "")  # Dify 輸出變量名(summary)
                                if full_text:
                                    final_output = True  # 標記已獲取最終結果
                                    yield full_text  # 僅返回一次完整文本
                                    break  # 退出循環,避免重複處理
                        except json.JSONDecodeError:
                            error_msg = f"[JSON解析錯誤] {json_str[:100]}"
                            print(f"pipe: {error_msg}")
                            yield error_msg
                # 容錯:若未觸發 workflow_finished,但有累積文本(極端情況)
                if not final_output and full_text:
                    yield full_text
            else:
                error_details = response.text[:500]
                yield f"請求失敗: 狀態碼 {response.status_code},詳情: {error_details}"
        
        except Exception as e:
            yield f"執行錯誤: {str(e)}"
#####流式輸出
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings

# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')

class Pipeline:
    def __init__(self):
        self.name = "IT_AI智能知識平台"  # 平台名稱
        self.api_url = "http://10.160.8.210/v1/workflows/run"  # Dify API 地址
        self.api_key = "app-N5aBftQwzEvbdla0KhAljB5E"          # Dify API Key
        self.api_request_stream = True                         # 啓用流式響應
        self.verify_ssl = True                                 # 自部署 Dify 可設為 False
        self.debug = True                                      # 開啓調試日誌(必須保留!)
    
    async def on_startup(self):
        print(f"on_startup: {self.name} 初始化完成")
    
    async def on_shutdown(self): 
        print(f"on_shutdown: {self.name} 已停止")

    async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if self.debug:
            print(f"inlet: body={body}, user={user}")
        return body

    async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
        if self.debug:
            print(f"outlet: body={body}, user={user}")
        return body

    def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
        print(f"pipe: {self.name} - 開始處理用户消息")
        
        if self.debug:
            print(f"pipe: 用户輸入: {user_message}")
            print(f"pipe: body內容: {body}")

        # 1. 構建請求參數
        response_mode = "streaming" if self.api_request_stream else "blocking"
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        data = {
            "inputs": {"user_message_text": user_message},  # Dify 輸入變量名(與工作流定義一致)
            "response_mode": response_mode,
            "user": body.get("user", {}).get("email", "default_user")
        }

        # 2. 發送請求到 Dify
        try:
            response = requests.post(
                self.api_url,
                headers=headers,
                json=data,
                stream=self.api_request_stream,
                verify=self.verify_ssl
            )

            # 3. 處理流式響應
            if response.status_code == 200:
                for line in response.iter_lines():
                    if not line:  # 跳過空行
                        continue
                    
                    line = line.decode('utf-8').strip()
                    if self.debug:
                        print(f"pipe: Dify 原始響應行: {line}")

                    # 只處理包含"data: "的行
                    if line.startswith("data: "):
                        json_str = line[len("data: "):]
                        try:
                            json_data = json.loads(json_str)
                            
                            # 處理text_chunk事件
                            if json_data.get("event") == "text_chunk":
                                chunk = json_data.get("data", {}).get("text", "")
                                if chunk:
                                    yield chunk  # 實時輸出每個文本塊
                            
                            # 可選:處理workflow_finished事件作為結束標誌
                            elif json_data.get("event") == "workflow_finished" and self.debug:
                                print("pipe: 工作流已完成")
                            
                        except json.JSONDecodeError:
                            error_msg = f"[JSON解析錯誤] {json_str[:100]}"
                            print(f"pipe: {error_msg}")
                            yield error_msg
            else:
                error_details = response.text[:500]
                yield f"請求失敗: 狀態碼 {response.status_code},詳情: {error_details}"
        
        except requests.exceptions.RequestException as e:
            yield f"請求異常: {str(e)}"
        except Exception as e:
            yield f"執行錯誤: {str(e)}"

5、將上述腳本上傳到下圖所在位置

Web API中使用Dependency Resolver_#dify_02

另:

這些示例展示瞭如何集成不同的功能,為構建您自己的自定義管道奠定基礎。https://github.com/open-webui/pipelines/blob/main/examples