博客 / 詳情

返回

如何在 Kuscia 上運行 SCQL 聯合分析任務

打開鏈接即可點亮社區Star,照亮技術的前進之路。

Github 地址:https://github.com/secretflow/kuscia

本教程將以 KusciaAPI 創建本地數據源作為示例,介紹如何在 Kuscia 上運行 SCQL 聯合分析任務。

準備節點

  • 體驗部署請選擇快速入門。
  • 生產部署請選擇多機部署。

本示例在點對點組網模式下完成。在中心化組網模式下,證書的配置會有所不同。

{#cert-and-token}

獲取 KusciaAPI 證書和 Token

在下面準備數據步驟中需要使用到 KusciaAPI,如果 KusciaAPI 啓用了 MTLS 協議,則需要提前準備好 MTLS 證書和 Token。協議參考這裏。

點對點組網模式

證書的配置參考配置授權

這裏以 Alice 節點為例,接口需要的證書文件在 ${USER}-kuscia-autonomy-alice 節點的 /home/kuscia/var/certs/ 目錄下:

文件名 文件功能
kusciaapi-server.key 服務端私鑰文件
kusciaapi-server.crt 服務端證書文件
ca.crt CA 證書文件
token 認證 Token ,在 headers 中添加 Token: { token 文件內容}

中心化組網模式

證書文件在 ${USER}-kuscia-master 節點的 /home/kuscia/var/certs/ 目錄下:

文件名 文件功能
kusciaapi-server.key 服務端私鑰文件
kusciaapi-server.crt 服務端證書文件
ca.crt CA 證書文件
token 認證 Token ,在 headers 中添加 Token: { token 文件內容}

準備數據

您可以使用本文示例的測試數據文件,或者使用您自己的數據文件。

在 Kuscia 中,在節點容器的 /home/kuscia/var/storage 目錄存放內置測試數據文件,下面 Alice 和 Bob 節點分別使用的是 scql-alice.csv 和 scql-bob.csv,您可以在容器中查看這兩個數據文件。

準備測試數據

Alice 準備測試數據
  1. 這裏以 Docker 部署模式為例,登錄到 alice 節點中

    docker exec -it ${USER}-kuscia-autonomy-alice bash
  2. 創建 DomainDataSource

    下面 datasource_id 名稱以 scql-demo-local-datasource 為例:

    export CTR_CERTS_ROOT=/home/kuscia/var/certs
    curl -k -X POST 'https://localhost:8082/api/v1/domaindatasource/create' \
     --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
     --header 'Content-Type: application/json' \
     --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
     --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
     --cacert ${CTR_CERTS_ROOT}/ca.crt \
     -d '{
      "domain_id": "alice",
      "datasource_id":"scql-demo-local-datasource",
      "type":"localfs",
      "name": "DemoDataSource",
      "info": {
          "localfs": {
              "path": "/home/kuscia/var/storage/data"
          }
      },
      "access_directly": true
    }'

    :::{tip}
    K8S RunK 模式部署 Kuscia 時,此處需要使用 OSS 數據源,並將 /home/kuscia/var/storage/data/scql-alice.csv 示例數據放入 OSS 中。
    :::

  3. 創建 DomainData

    下面 domaindata_id 名稱以 scql-alice-table 為例:

    export CTR_CERTS_ROOT=/home/kuscia/var/certs
    curl -k -X POST 'https://localhost:8082/api/v1/domaindata/create' \
     --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
     --header 'Content-Type: application/json' \
     --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
     --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
     --cacert ${CTR_CERTS_ROOT}/ca.crt \
     -d '{
      "domain_id": "alice",
      "domaindata_id": "scql-alice-table",
      "datasource_id": "scql-demo-local-datasource",
      "name": "alice001",
      "type": "table",
      "relative_uri": "scql-alice.csv",
      "columns": [
        {
          "name": "ID",
          "type": "str"
        },
        {
          "name": "credit_rank",
          "type": "int"
        },
        {
          "name": "income",
          "type": "int"
        },
        {
          "name": "age",
          "type": "int"
        }
      ]
    }'
Bob 準備測試數據
  1. 這裏以 Docker 部署模式為例,登錄到 Bob 節點中

    docker exec -it ${USER}-kuscia-autonomy-bob bash
  2. 創建 DomainDataSource

    下面 datasource_id 名稱以 scql-demo-local-datasource 為例:

    export CTR_CERTS_ROOT=/home/kuscia/var/certs
    curl -k -X POST 'https://localhost:8082/api/v1/domaindatasource/create' \
     --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
     --header 'Content-Type: application/json' \
     --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
     --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
     --cacert ${CTR_CERTS_ROOT}/ca.crt \
     -d '{
      "domain_id": "bob",
      "datasource_id":"scql-demo-local-datasource",
      "type":"localfs",
      "name": "DemoDataSource",
      "info": {
          "localfs": {
              "path": "/home/kuscia/var/storage/data"
          }
      },
      "access_directly": true
    }'

    :::{tip}
    K8S RunK 模式部署 Kuscia 時,此處需要使用 OSS 數據源,並將 /home/kuscia/var/storage/data/scql-bob.csv 示例數據放入 OSS 中。
    :::

  3. 創建 DomainData

    下面 domaindata_id 名稱以 scql-bob-table 為例:

    export CTR_CERTS_ROOT=/home/kuscia/var/certs
    curl -k -X POST 'https://localhost:8082/api/v1/domaindata/create' \
     --header "Token: $(cat ${CTR_CERTS_ROOT}/token)" \
     --header 'Content-Type: application/json' \
     --cert ${CTR_CERTS_ROOT}/kusciaapi-server.crt \
     --key ${CTR_CERTS_ROOT}/kusciaapi-server.key \
     --cacert ${CTR_CERTS_ROOT}/ca.crt \
     -d '{
      "domain_id": "bob",
      "domaindata_id": "scql-bob-table",
      "datasource_id": "scql-demo-local-datasource",
      "name": "bob001",
      "type": "table",
      "relative_uri": "scql-bob.csv",
      "columns": [
        {
          "name": "ID",
          "type": "str"
        },
        {
          "name": "order_amount",
          "type": "int"
        },
        {
          "name": "is_active",
          "type": "int"
        }
      ]
    }'

部署 SCQL

Alice 部署 SCQL

  1. 登陸到 alice 節點容器中

    docker exec -it ${USER}-kuscia-autonomy-alice bash

    如果是中心化組網模式,則需要登錄到 master 節點容器中。

    docker exec -it ${USER}-kuscia-master bash
  2. 獲取 SCQL 應用的鏡像模版 AppImage

    從 SCQL 官方文檔中,獲取 AppImage 具體內容,並將其內容保存到 scql-image.yaml 文件中。 具體模版內容,可參考 SCQL AppImage。

    注意:

    1. 如果 secretflow/scql 倉庫訪問網速較慢,可以替換為 secretflow-registry.cn-hangzhou.cr.aliyuncs.com/secretflow/scql
    2. 請刪除 #--datasource_router=kusciadatamesh 代碼行前面的 # 符號,以啓用 Datamesh 本地數據源配置。
    3. engineConf 字段加上 --enable_restricted_read_path=false 限制 csv 文件的讀取路徑。
    4. K8S RunK 模式部署 Kuscia 時,需要使用 MySQL 存儲 Broker 元數據。修改 storage 字段的 type 為 MySQL 和 conn_str 對應的數據庫連接字符串。
    5. 如果 AppImage 配置有改動可以重啓 Kuscia 或重新創建 Broker 使配置生效。示例命令:kubectl delete KusciaDeployment scql -n cross-domain kubectl apply -f broker-deploy.yaml
  3. 創建 SCQL 應用的鏡像模版 AppImage
kubectl apply -f scql-image.yaml
  1. 部署 Broker
kubectl apply -f /home/kuscia/scripts/templates/scql/broker_alice.yaml

Bob 部署 SCQL

  1. 登陸到 Bob 節點容器中

    docker exec -it ${USER}-kuscia-autonomy-bob bash

    如果是中心化組網模式,則需要登錄到 master 節點容器中。

  2. docker exec -it ${USER}-kuscia-master bash
  3. 獲取 SCQL 應用的鏡像模版 AppImage

    從 SCQL 官方文檔中,獲取 AppImage 具體內容,並將其內容保存到 scql-image.yaml 文件中。 具體模版內容,可參考 SCQL AppImage。

    注意:

    1. 如果 secretflow/scql 倉庫訪問網速較慢,可以替換為 secretflow-registry.cn-hangzhou.cr.aliyuncs.com/secretflow/scql
    2. 請刪除 #--datasource_router=kusciadatamesh 代碼行前面的 # 符號,以啓用 Datamesh 本地數據源配置。
    3. engineConf 字段加上 --enable_restricted_read_path=false 限制 csv 文件的讀取路徑。
    4. K8S RunK 模式部署 Kuscia 時,需要使用 MySQL 存儲 Broker 元數據。修改 storage 字段的 type 為 MySQL 和 conn_str 對應的數據庫連接字符串。
    5. 如果 AppImage 配置有改動可以重啓 Kuscia 或重新創建 Broker 使配置生效。示例命令:kubectl delete KusciaDeployment scql -n cross-domain kubectl apply -f broker-deploy.yaml
  4. 創建 SCQL 應用的鏡像模版 AppImage

    kubectl apply -f appimage.yaml
  5. 部署 Broker

    kubectl apply -f /home/kuscia/scripts/templates/scql/broker_bob.yaml

    查看 broker 是否部署成功

    下面以 Alice 節點為例,Bob 節點類似

    docker exec -it ${USER}-kuscia-autonomy-alice kubectl get po -A

When the Pod status is Running, it indicates that the deployment was successful:

NAMESPACE NAME READY STATUS RESTARTS AGE
alice scql-broker-6f4f85b64f-fsgq8 1/1 Running 0 2m42s


## 使用 SCQL 進行聯合分析

下面僅以流程步驟作為示例展示,更多接口參數請參考 [SCQL API](https://www.secretflow.org.cn/zh-CN/docs/scql/main/reference/broker-api)。

### 創建項目並邀請參與方加入

#### Alice 創建項目,並邀請 Bob 加入

1. 登錄到 Alice 節點容器中
   

docker exec -it ${USER}-kuscia-autonomy-alice bash

2. 創建項目

下面項目名稱以 "demo" 為例:

curl -X POST http://127.0.0.1:80/intra/project/create \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-d '{

   "project_id":"demo",
   "name":"demo",
   "conf":{
       "spu_runtime_cfg":{
       "protocol":"SEMI2K",
       "field":"FM64"
       }
   },
  "description":"this is a project"

}'

3. 查看項目

curl -X POST http://127.0.0.1:80/intra/project/list \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice"

4. 邀請 Bob 加入到 "demo" 項目中

curl -X POST http://127.0.0.1:80/intra/member/invite \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-d '{

   "invitee": "bob",
   "project_id": "demo"

}'

5. 查看邀請狀態

curl -X POST http://127.0.0.1:80/intra/invitation/list \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice"


#### Bob 接受邀請

1. 登錄到 Bob 節點容器中

docker exec -it ${USER}-kuscia-autonomy-bob bash

2. Bob 接受 Alice 的入項邀請

curl -X POST http://127.0.0.1:80/intra/invitation/process \
--header "host: scql-broker-intra.bob.svc" \
--header "kuscia-source: bob" \
-d '{

   "invitation_id":1,
   "respond":0

}'


### 創建數據表

#### Alice 創建數據表

1. 登錄到 Alice 節點容器中

docker exec -it ${USER}-kuscia-autonomy-alice bash

2. 創建數據表

> 下面 table_name 以 ta 為例,ref_table 參數的值為[創建 DomainData](./run_scql_on_kuscia_cn.md#alice-準備測試數據)時的 `domaindata_id`

curl -X POST http://127.0.0.1:80/intra/table/create \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"table_name": "ta",
"ref_table": "scql-alice-table",
"db_type": "csvdb",
"columns": [
    {"name":"ID","dtype":"string"},
    {"name":"credit_rank","dtype":"int"},
    {"name":"income","dtype":"int"},
    {"name":"age","dtype":"int"}
]

}'


#### Bob 創建數據表

1. 登錄到 Bob 節點容器中
   

docker exec -it ${USER}-kuscia-autonomy-bob bash

2. 創建數據表

> 下面 table_name 以 ta 為例,ref_table 參數的值為[創建 DomainData](./run_scql_on_kuscia_cn.md#bob-準備測試數據)時的 `domaindata_id`

curl -X POST http://127.0.0.1:80/intra/table/create \
--header "host: scql-broker-intra.bob.svc" \
--header "kuscia-source: bob" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"table_name": "tb",
"ref_table": "scql-bob-table",
"db_type": "csvdb",
"columns": [
    {"name":"ID","dtype":"string"},
    {"name":"order_amount","dtype":"double"},
    {"name":"is_active","dtype":"int"}
]

}'


### 查看數據表

下面以 Alice 為例,Bob 節點類似

curl -X POST http://127.0.0.1:80/intra/table/list \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo"

}'


### 刪除數據表

若想刪除創建的數據表時,可以參考下面命令。以 Alice 節點為例,Bob 節點類似。

curl -X POST http://127.0.0.1:80/intra/table/drop \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"table_name":"ta"

}'


### 數據表授權

#### Alice 的數據表授權

1. 將 ta 數據表授權給 Alice
   

curl -X POST http://127.0.0.1:80/intra/ccl/grant \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

   "project_id": "demo",
   "column_control_list":[
   {"col":{"column_name":"ID","table_name":"ta"},"party_code":"alice","constraint":1},
   {"col":{"column_name":"age","table_name":"ta"},"party_code":"alice","constraint":1},
   {"col":{"column_name":"income","table_name":"ta"},"party_code":"alice","constraint":1},
   {"col":{"column_name":"credit_rank","table_name":"ta"},"party_code":"alice","constraint":1}
   ]

}'

2. 將 ta 表授權給 Bob 節點

curl -X POST http://127.0.0.1:80/intra/ccl/grant \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

   "project_id": "demo",
   "column_control_list":[
   {"col":{"column_name":"ID","table_name":"ta"},"party_code":"bob","constraint":1},
   {"col":{"column_name":"age","table_name":"ta"},"party_code":"bob","constraint":1},
   {"col":{"column_name":"income","table_name":"ta"},"party_code":"bob","constraint":1},
   {"col":{"column_name":"credit_rank","table_name":"ta"},"party_code":"bob","constraint":1}
   ]

}'


#### Bob 的數據表授權

1. 將 tb 表授權給 Alice 節點

curl -X POST http://127.0.0.1:80/intra/ccl/grant \
--header "host: scql-broker-intra.bob.svc" \
--header "kuscia-source: bob" \
-H "Content-Type: application/json" \
-d '{

     "project_id": "demo",
     "column_control_list":[
     {"col":{"column_name":"ID","table_name":"tb"},"party_code":"alice","constraint":1},
     {"col":{"column_name":"is_active","table_name":"tb"},"party_code":"alice","constraint":1},
     {"col":{"column_name":"order_amount","table_name":"tb"},"party_code":"alice","constraint":1}
     ]

}'

2. 將 tb 表授權給 Bob 節點

curl -X POST http://127.0.0.1:80/intra/ccl/grant \
--header "host: scql-broker-intra.bob.svc" \
--header "kuscia-source: bob" \
-H "Content-Type: application/json" \
-d '{

   "project_id": "demo",
   "column_control_list":[
   {"col":{"column_name":"ID","table_name":"tb"},"party_code":"bob","constraint":1},
   {"col":{"column_name":"is_active","table_name":"tb"},"party_code":"bob","constraint":1},
   {"col":{"column_name":"order_amount","table_name":"tb"},"party_code":"bob","constraint":1}
   ]

}'


### 查看數據表授權

下面以 Alice 為例,Bob 節點類似

curl -X POST http://127.0.0.1:80/intra/ccl/show \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"tables":["ta"],
"dest_parties":["alice"]

}'


### 撤銷數據表授權

若想撤銷數據表授權,那麼可以參考下面命令。以 Alice 節點為例,Bob 節點類似。

curl -X POST http://127.0.0.1:80/intra/ccl/revoke \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"column_control_list":[
{"col":{"column_name":"ID","table_name":"ta"},"party_code":"alice","constraint":1},
{"col":{"column_name":"age","table_name":"ta"},"party_code":"alice","constraint":1},
{"col":{"column_name":"income","table_name":"ta"},"party_code":"alice","constraint":1},
{"col":{"column_name":"credit_rank","table_name":"ta"},"party_code":"alice","constraint":1}
]

}'


### 進行聯合分析

#### 同步查詢

下面以 Alice 節點查詢為例 Bob 節點類似。

curl -X POST http://127.0.0.1:80/intra/query \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

"project_id": "demo",
"query":"SELECT ta.credit_rank, COUNT(*) as cnt, AVG(ta.income) as avg_income, AVG(tb.order_amount) as avg_amount FROM ta INNER JOIN tb ON ta.ID = tb.ID WHERE ta.age >= 20 AND ta.age <= 30 AND tb.is_active=1 GROUP BY ta.credit_rank;"

}'


返回的成功結果如下:

{

"status": {
    "code": 0,
    "message": "",
    "details": []
},
"affected_rows": "0",
"warnings": [],
"cost_time_s": 7.171298774,
"out_columns": [{
    "name": "credit_rank",
    "shape": {
        "dim": [{
            "dim_value": "2"
        }, {
            "dim_value": "1"
        }]
    },
    "elem_type": "INT64",
    "option": "VALUE",
    "annotation": {
        "status": "TENSORSTATUS_UNKNOWN"
    },
    "int32_data": [],
    "int64_data": ["6", "5"],
    "float_data": [],
    "double_data": [],
    "bool_data": [],
    "string_data": [],
    "ref_num": 0
}, {
    "name": "cnt",
    "shape": {
        "dim": [{
            "dim_value": "2"
        }, {
            "dim_value": "1"
        }]
    },
    "elem_type": "INT64",
    "option": "VALUE",
    "annotation": {
        "status": "TENSORSTATUS_UNKNOWN"
    },
    "int32_data": [],
    "int64_data": ["3", "1"],
    "float_data": [],
    "double_data": [],
    "bool_data": [],
    "string_data": [],
    "ref_num": 0
}, {
    "name": "avg_income",
    "shape": {
        "dim": [{
            "dim_value": "2"
        }, {
            "dim_value": "1"
        }]
    },
    "elem_type": "FLOAT64",
    "option": "VALUE",
    "annotation": {
        "status": "TENSORSTATUS_UNKNOWN"
    },
    "int32_data": [],
    "int64_data": [],
    "float_data": [],
    "double_data": [438000, 30070],
    "bool_data": [],
    "string_data": [],
    "ref_num": 0
}, {
    "name": "avg_amount",
    "shape": {
        "dim": [{
            "dim_value": "2"
        }, {
            "dim_value": "1"
        }]
    },
    "elem_type": "FLOAT64",
    "option": "VALUE",
    "annotation": {
        "status": "TENSORSTATUS_UNKNOWN"
    },
    "int32_data": [],
    "int64_data": [],
    "float_data": [],
    "double_data": [4060.6666666666665, 3598],
    "bool_data": [],
    "string_data": [],
    "ref_num": 0
}]

}


#### 異步查詢

下面以 Alice 節點為例,Bob 節點類似。

1. 提交 query
   

curl -X POST http://127.0.0.1:80/intra/query/submit \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

   "project_id": "demo",
   "query":"SELECT ta.credit_rank, COUNT(*) as cnt, AVG(ta.income) as avg_income, AVG(tb.order_amount) as avg_amount FROM ta INNER JOIN tb ON ta.ID = tb.ID WHERE ta.age >= 20 AND ta.age <= 30 AND tb.is_active=1 GROUP BY ta.credit_rank;"

}'

2. 獲取結果

curl -X POST http://127.0.0.1:80/intra/query/fetch \
--header "host: scql-broker-intra.alice.svc" \
--header "kuscia-source: alice" \
-H "Content-Type: application/json" \
-d '{

     "job_id":"3c4723fb-9afa-11ee-8934-0242ac12000"

}'


## 參考

### 常用命令

查看 broker kd 狀態:

docker exec -it ${USER}-kuscia-autonomy-alice kubectl get kd -n cross-domain


查看 broker deployment 狀態

docker exec -it ${USER}-kuscia-autonomy-alice kubectl get deployment -A


查看 broker 應用狀態

docker exec -it ${USER}-kuscia-autonomy-alice kubectl get po -A


查看 broker configmap

docker exec -it ${USER}-kuscia-autonomy-alice kubectl get cm scql-broker-configtemplate -n alice -oyaml


查看 appImage

docker exec -it ${USER}-kuscia-autonomy-alice kubectl get appimage


刪除 broker

docker exec -it ${USER}-kuscia-autonomy-alice kubectl delete kd scql -n cross-domain


### 如何查看 SCQL 應用容器日誌

在 Kuscia 中,可以登陸到節點容器內查看 SCQL 應用容器的日誌。具體方法如下。

1. 登陸到節點容器中
   
   下面以 Alice 節點為例:
   

docker exec -it ${USER}-kuscia-autonomy-alice bash

2. 查看日誌

在目錄 `/home/kuscia/var/stdout/pods` 下可以看到對應 SCQL Broker 和 Engine 應用容器的目錄。後續進入到相應目錄下,即可查看應用的日誌。

# View the current application container's directory
ls /home/kuscia/var/stdout/pods

# View the application container's logs, example as follows:
cat /home/kuscia/var/stdout/pods/alice_xxxx_engine_xxxx/secretflow/0.log
cat /home/kuscia/var/stdout/pods/alice_xxxx_broker_xxxx/secretflow/0.log

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.