前言
-
技術棧
Python 3.11.8 websockets 15.0.1 aliyun-python-sdk-core 2.16.0 nls 1.0.0 - 截至
2025.3.13,nls.NlsSpeechTranscriber不支持異步調用 - 使用
asyncio.run或loop.create_task將異步調用轉化為同步調用 -
後文中,為保持字節流和字符串的一致性,定義了不同格式的結束符
b'$END$' client -> server '$END$' server -> client
材料準備
- 從 GitHub
下載 nls 目錄和tests 目錄裏面的 test1.pcm文件
- 將
nls目錄放到site-packages目錄下,相當於安裝nls庫
示例代碼
-
server 端代碼(
sync同步版本)# encoding: utf-8 # author: qbit # date: 2025-03-13 # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果 import time import json import nls from websockets.sync.server import serve URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1" TOKEN = "your_token" # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token APPKEY = "your_key" # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist class FuncCallback: r"""nls.NlsSpeechTranscriber 需要的回調函數""" def __init__(self, ): self.result = "" self.state = "" def on_sentence_begin(self, message, *args): print("on_sentence_begin") def on_sentence_end(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "sentence_end" print(f"on_sentence_end: {self.result}") websocket = args[0] websocket.send(self.result) def on_start(self, message, *args): print("on_start") def on_error(self, message, *args): print(f"on_error from aliyun {message}") self.state = "error" def on_close(self, *args): print("on_close") def on_result_chg(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "result_chg" print(f"on_result_chg: {self.result}") websocket = args[0] websocket.send(self.result) def on_completed(self, message, *args): print("on_completed") self.state = "completed" def asr(websocket): r"""語言識別服務,Automatic Speech Recognition (ASR)""" print( f"Client connected: {websocket.remote_address[0]}:{websocket.remote_address[1]}" ) callback = FuncCallback() sr = nls.NlsSpeechTranscriber( url=URL, token=TOKEN, appkey=APPKEY, on_sentence_begin=callback.on_sentence_begin, on_sentence_end=callback.on_sentence_end, on_start=callback.on_start, on_result_changed=callback.on_result_chg, on_completed=callback.on_completed, on_error=callback.on_error, on_close=callback.on_close, callback_args=[websocket], ) sr.start( aformat="pcm", # 支持格式 pcm, opu, opus enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True, ) for message in websocket: if message == b"$END$": break sr.send_audio(message) sr.stop() while callback.state not in ('completed', 'error'): time.sleep(0.1) print(f"callback.state: {callback.state}") websocket.send("$END$") # 發送自定義結束標誌 def main(): host = "127.0.0.1" port = 8765 with serve(asr, host, port) as server: print(f"Server started {host}:{port}...") server.serve_forever() if __name__ == "__main__": main() -
client 端代碼
# encoding: utf-8 # author: qbit # date: 2025-03-13 # summary: websocket 客户端模擬實時語音請求 import threading from websockets.sync.client import connect def send(websocket): r""" 測試語言識別 Automatic Speech Recognition (ASR) """ with open("./audio/test1.pcm", "rb") as f: data = f.read() __slices = zip(*(iter(data),) * 6400) for i in __slices: websocket.send(bytes(i)) websocket.send(b"$END$") # 發送自定義結束標誌 def recv(websocket): r""" 測試語言識別 Automatic Speech Recognition (ASR) """ while True: message = websocket.recv() print(f"Received message: {message}") if message == "$END$": break if __name__ == "__main__": uri = "ws://localhost:8765" print(f"Connecting to {uri}...") with connect(uri) as websocket: t1 = threading.Thread(target=send, args=(websocket,)) t2 = threading.Thread(target=recv, args=(websocket,)) t1.start() t2.start() t1.join() t2.join()
運行與輸出
- 先啓動 server 代碼,再啓動 client 代碼
-
server 輸出
Server started 127.0.0.1:8765... Client connected: 127.0.0.1:53387 on_start on_sentence_begin on_result_chg: 因 on_result_chg: 一 on_result_chg: 12 on_result_chg: 123 on_result_chg: 1234 on_result_chg: 12345 on_result_chg: 123456 on_result_chg: 1234567 on_result_chg: 12345678 on_result_chg: 1 2 3 4 5 6 7 8 9 10 on_sentence_end: 1 2 3 4 5 6 7 8 9 10。 on_completed on_close -
client 輸出
Connecting to ws://localhost:8765... Received message: 因 Received message: 一 Received message: 12 Received message: 123 Received message: 1234 Received message: 12345 Received message: 123456 Received message: 1234567 Received message: 12345678 Received message: 1 2 3 4 5 6 7 8 9 10 Received message: 1 2 3 4 5 6 7 8 9 10。 Received message: $END$
async 異步版 Server 代碼
- 本節的異步版本代碼,可以與 fastapi.WebSocket 結合使用
-
asyncio.run版本# encoding: utf-8 # author: qbit # date: 2025-03-13 # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果 import json import nls import asyncio from websockets.asyncio.server import serve URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1" TOKEN = "your_token" # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token APPKEY = "your_key" # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist class FuncCallback: r"""nls.NlsSpeechTranscriber 需要的回調函數""" def __init__(self, ): self.result = "" self.state = "" def on_sentence_begin(self, message, *args): print("on_sentence_begin") def on_sentence_end(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "sentence_end" print(f"on_sentence_end: {self.result}") websocket = args[0] asyncio.run(websocket.send(self.result)) # 將異步調用轉化為同步調用 def on_start(self, message, *args): print("on_start") def on_error(self, message, *args): print(f"on_error from aliyun {message}") self.state = "error" def on_close(self, *args): print("on_close") def on_result_chg(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "result_chg" print(f"on_result_chg: {self.result}") websocket = args[0] asyncio.run(websocket.send(self.result)) # 將異步調用轉化為同步調用 def on_completed(self, message, *args): print("on_completed") self.state = "completed" async def asr(websocket): callback = FuncCallback() sr = nls.NlsSpeechTranscriber( url=URL, token=TOKEN, appkey=APPKEY, on_sentence_begin=callback.on_sentence_begin, on_sentence_end=callback.on_sentence_end, on_start=callback.on_start, on_result_changed=callback.on_result_chg, on_completed=callback.on_completed, on_error=callback.on_error, on_close=callback.on_close, callback_args=[websocket], ) sr.start( aformat="pcm", enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True, ) async for message in websocket: if message == b"$END$": break sr.send_audio(message) sr.stop() while callback.state not in ("completed", "error"): await asyncio.sleep(0.1) print(f"callback.state: {callback.state}") await websocket.send("$END$") # 發送自定義結束標誌 async def main(): host = "127.0.0.1" port = 8765 async with serve(asr, host, port) as server: print(f"Server started {host}:{port}...") await server.serve_forever() if __name__ == "__main__": asyncio.run(main()) -
因為
asyncio.run新開了事件循環有些重,花了不少時間弄出下面的loop.create_task版本# encoding: utf-8 # author: qbit # date: 2025-03-14 # summary: websocket 服務端結束語言識別請求,調用阿里雲返回識別的文字結果 import json import nls import asyncio from websockets.asyncio.server import serve URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1" TOKEN = "your_token" # 參考 https://help.aliyun.com/document_detail/450255.html 獲取token APPKEY = "your_key" # 獲取Appkey請前往控制枱:https://nls-portal.console.aliyun.com/applist class FuncCallback: r"""nls.NlsSpeechTranscriber 需要的回調函數""" def __init__(self): self.result = "" self.state = "" def on_sentence_begin(self, message, *args): print("on_sentence_begin") def on_sentence_end(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "sentence_end" print(f"on_sentence_end: {self.result}") websocket = args[0] loop: asyncio.windows_events.ProactorEventLoop = args[1] loop.create_task(websocket.send(self.result)) def on_start(self, message, *args): print("on_start") def on_error(self, message, *args): print(f"on_error from aliyun {message}") self.state = "error" def on_close(self, *args): print("on_close") def on_result_chg(self, message, *args): self.result = json.loads(message)["payload"]["result"] self.state = "result_chg" print(f"on_result_chg: {self.result}") websocket = args[0] loop: asyncio.windows_events.ProactorEventLoop = args[1] loop.create_task(websocket.send(self.result)) def on_completed(self, message, *args): print("on_completed") self.state = "completed" async def asr(websocket): callback = FuncCallback() # loop = asyncio.get_running_loop() loop = asyncio.get_event_loop() sr = nls.NlsSpeechTranscriber( url=URL, token=TOKEN, appkey=APPKEY, on_sentence_begin=callback.on_sentence_begin, on_sentence_end=callback.on_sentence_end, on_start=callback.on_start, on_result_changed=callback.on_result_chg, on_completed=callback.on_completed, on_error=callback.on_error, on_close=callback.on_close, callback_args=[websocket, loop], ) sr.start( aformat="pcm", # 支持格式 pcm, opu, opus enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True, ) async for message in websocket: if message == b"$END$": break sr.send_audio(message) sr.stop() # 下一行代碼很重要!沒有的話 FuncCallback 裏面的 loop.create_task 不會被觸發 await asyncio.sleep(0) while callback.state not in ("completed", "error"): await asyncio.sleep(0.1) print(f"callback.state: {callback.state}") await websocket.send("$END$") # 發送自定義結束標誌 async def main(): host = "127.0.0.1" port = 8765 async with serve(asr, host, port) as server: print(f"Server started {host}:{port}...") await server.serve_forever() if __name__ == "__main__": asyncio.run(main())
相關閲讀
- 阿里雲實時語音識別官方文檔 https://help.aliyun.com/zh/isi/developer-reference/sdk-for-py...
- websockets GitHub 倉庫:https://github.com/python-websockets/websockets
本文出自 qbit sanp