在美股數據服務中,行情 API 通常分為幾種類型,每種接口都有其特定的功能與應用場景:
1. 延遲行情接口
顧名思義,這類接口提供的行情數據會存在時間延遲,通常為 15 分鐘左右。也就是説,你看到的成交價格是 15 分鐘前的市場價格。延遲行情接口是最常見的類型,許多投資類應用(如雪球或部分證券交易 App)展示的價格數據,實際上都屬於延遲行情。
2. 實時行情接口
實時行情接口提供即時更新的市場數據,包括股票的最新價格、成交量、漲跌幅等信息。對於需要毫秒級反應的 量化交易系統 或 高頻交易策略 而言,實時數據的準確性和時效性至關重要,因此這類接口通常是專業交易者的首選。
3. 歷史行情接口
歷史行情接口則提供過去一段時間內的市場數據,如開盤價、收盤價、最高價、最低價等。這類數據對於進行 技術分析、回測交易策略 或 研究市場規律 的投資者來説十分重要,是量化研究和模型訓練的核心基礎。
在當今全球化的金融環境中,美股行情 API 已成為各類金融應用的關鍵基礎。通過接入行情 API,用户可以獲得 實時、全面且高精度 的市場數據,從而在交易決策、投資分析以及產品體驗上佔據競爭優勢。
量化交易團隊 藉助行情 API 構建並優化交易策略,確保每個決策都基於最新的市場動態。
個人投資者與分析師 可以利用這些數據進行深度的市場研究與趨勢預測。
投資類應用與財經媒體平台 則通過實時行情提升用户體驗,為用户提供權威、及時的市場信息。
無論是機構投資者還是個人交易者,美股行情 API 都是獲取市場洞察、提高決策質量、把握交易時機的重要工具。
下面是美股實時行情接口的請求示例。
RESTful
下面的代碼可以查詢批量K線,支持1分鐘到1年的不同週期,具體可以看這個Github文檔。
import requests
# www.infoway.io
api_url = 'https://data.infoway.io/stock/batch_kline/1/10/002594.SZ%2C00285.HK%2CTSLA.US'
# 設置請求頭
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': 'application/json',
'apiKey': 'yourApikey'
}
# 發送GET請求
response = requests.get(api_url, headers=headers)
# 輸出結果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")
Websocket
對於高頻交易來説,數據的延遲要求更高,所以使用Websocket是更好的選擇,下面的WS代碼示例可以訂閲K線、成交明細、盤口。支持跨市場查詢,比如我們可以同時查美股、港股、A股,甚至外匯和期貨:
import json
import time
import schedule
import threading
import websocket
from loguru import logger
#www.infoway.io
class WebsocketExample:
def __init__(self):
self.session = None
self.ws_url = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey"
self.reconnecting = False
self.is_ws_connected = False # 添加連接狀態標誌
def connect_all(self):
"""建立WebSocket連接並啓動自動重連機制"""
try:
self.connect(self.ws_url)
self.start_reconnection(self.ws_url)
except Exception as e:
logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
def start_reconnection(self, url):
"""啓動定時重連檢查"""
def check_connection():
if not self.is_connected():
logger.debug("Reconnection attempt...")
self.connect(url)
# 使用線程定期檢查連接狀態
schedule.every(10).seconds.do(check_connection)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
threading.Thread(target=run_scheduler, daemon=True).start()
def is_connected(self):
"""檢查WebSocket連接狀態"""
return self.session and self.is_ws_connected
def connect(self, url):
"""建立WebSocket連接"""
try:
if self.is_connected():
self.session.close()
self.session = websocket.WebSocketApp(
url,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# 啓動WebSocket連接(非阻塞模式)
threading.Thread(target=self.session.run_forever, daemon=True).start()
except Exception as e:
logger.error(f"Failed to connect to the server: {str(e)}")
def on_open(self, ws):
"""WebSocket連接建立成功後的回調"""
logger.info(f"Connection opened")
self.is_ws_connected = True # 設置連接狀態為True
try:
# 發送實時成交明細訂閲請求
trade_send_obj = {
"code": 10000,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "BTCUSDT"}
}
self.send_message(trade_send_obj)
# 不同請求之間間隔一段時間
time.sleep(5)
# 發送實時盤口數據訂閲請求
depth_send_obj = {
"code": 10003,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": {"codes": "BTCUSDT"}
}
self.send_message(depth_send_obj)
# 不同請求之間間隔一段時間
time.sleep(5)
# 發送實時K線數據訂閲請求
kline_data = {
"arr": [
{
"type": 1,
"codes": "BTCUSDT"
}
]
}
kline_send_obj = {
"code": 10006,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
"data": kline_data
}
self.send_message(kline_send_obj)
# 啓動定時心跳任務
schedule.every(30).seconds.do(self.ping)
except Exception as e:
logger.error(f"Error sending initial messages: {str(e)}")
def on_message(self, ws, message):
"""接收消息的回調"""
try:
logger.info(f"Message received: {message}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
def on_close(self, ws, close_status_code, close_msg):
"""連接關閉的回調"""
logger.info(f"Connection closed: {close_status_code} - {close_msg}")
self.is_ws_connected = False # 設置連接狀態為False
def on_error(self, ws, error):
"""錯誤處理的回調"""
logger.error(f"WebSocket error: {str(error)}")
self.is_ws_connected = False # 發生錯誤時設置連接狀態為False
def send_message(self, message_obj):
"""發送消息到WebSocket服務器"""
if self.is_connected():
try:
self.session.send(json.dumps(message_obj))
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
else:
logger.warning("Cannot send message: Not connected")
def ping(self):
"""發送心跳包"""
ping_obj = {
"code": 10010,
"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
}
self.send_message(ping_obj)
# 使用示例
if __name__ == "__main__":
ws_client = WebsocketExample()
ws_client.connect_all()
# 保持主線程運行
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Exiting...")
if ws_client.is_connected():
ws_client.session.close()