Stories

Detail Return Return

印度尼西亞股票數據API對接實現 - Stories Detail

環境準備

首先安裝必要的依賴包:

pip install requests websocket-client pandas numpy

基礎配置

import requests
import json
import websocket
import threading
import time
from datetime import datetime

# API配置
API_KEY = "YOUR_API_KEY"  # 替換為您的實際API密鑰
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"

# 印尼股票代碼映射(示例)
IDX_SYMBOLS = {
    "BBCA": "Bank Central Asia",
    "BBRI": "Bank Rakyat Indonesia",
    "TLKM": "Telkom Indonesia",
    "ASII": "Astra International",
    "UNVR": "Unilever Indonesia"
}

REST API實現

1. 獲取印尼股票列表

def get_indonesia_stocks(page=1, page_size=100):
    """獲取印尼交易所股票列表"""
    url = f"{BASE_URL}/stock/stocks"
    params = {
        "countryId": 42,      # 印尼國家ID
        "exchangeId": 62,     # IDX交易所ID
        "pageSize": page_size,
        "page": page,
        "key": API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]["records"]
            else:
                print(f"API Error: {data.get('message')}")
        else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
        print(f"Error fetching stock list: {str(e)}")
    
    return []

# 示例:獲取第一頁股票列表
stocks = get_indonesia_stocks()
for stock in stocks:
    print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")

2. 查詢特定股票詳情

def get_stock_detail(symbol_or_id):
    """獲取股票詳細信息"""
    url = f"{BASE_URL}/stock/queryStocks"
    
    # 判斷是symbol還是id
    if isinstance(symbol_or_id, str) and symbol_or_id.isdigit():
        params = {"id": symbol_or_id, "key": API_KEY}
    else:
        params = {"symbol": symbol_or_id, "key": API_KEY}
    
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200 and data["data"]:
                return data["data"][0]
            else:
                print(f"API Error: {data.get('message')}")
        else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
        print(f"Error fetching stock detail: {str(e)}")
    
    return None

# 示例:獲取BBCA股票詳情
bbca_detail = get_stock_detail("BBCA")
if bbca_detail:
    print(f"BBCA當前價格: {bbca_detail['last']}")
    print(f"漲跌幅: {bbca_detail['chgPct']}%")

3. 獲取指數數據

def get_indonesia_indices():
    """獲取印尼主要指數"""
    url = f"{BASE_URL}/stock/indices"
    params = {
        "countryId": 42,  # 印尼國家ID
        "key": API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
        else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
        print(f"Error fetching indices: {str(e)}")
    
    return []

# 示例:獲取印尼指數
indices = get_indonesia_indices()
for index in indices:
    print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")

4. 獲取K線數據

def get_kline_data(pid, interval="PT15M"):
    """獲取股票K線數據"""
    url = f"{BASE_URL}/stock/kline"
    params = {
        "pid": pid,
        "interval": interval,
        "key": API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
        else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
        print(f"Error fetching kline data: {str(e)}")
    
    return []

# 示例:獲取BBCA的15分鐘K線數據
bbca_kline = get_kline_data(41602, "PT15M")
for kline in bbca_kline[:5]:  # 顯示前5條
    dt = datetime.fromtimestamp(kline["time"] / 1000)
    print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")

5. 獲取漲跌排行榜

def get_top_gainers():
    """獲取漲幅榜"""
    url = f"{BASE_URL}/stock/updownList"
    params = {
        "countryId": 42,  # 印尼國家ID
        "type": 1,         # 1=漲幅榜, 2=跌幅榜
        "key": API_KEY
    }
    
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
        else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
        print(f"Error fetching top gainers: {str(e)}")
    
    return []

# 示例:獲取印尼漲幅榜
gainers = get_top_gainers()
for stock in gainers[:10]:  # 顯示前10名
    print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")

WebSocket實時數據

WebSocket客户端實現

class IDXWebSocketClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.ws = None
        self.connected = False
        self.subscriptions = set()
        
        # 啓動連接
        self.connect()
        
        # 啓動心跳線程
        threading.Thread(target=self.heartbeat, daemon=True).start()
    
    def connect(self):
        """連接WebSocket服務器"""
        ws_url = f"{WS_URL}?key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        
        # 啓動WebSocket線程
        threading.Thread(target=self.ws.run_forever).start()
        time.sleep(1)  # 等待連接建立
    
    def on_open(self, ws):
        """連接建立回調"""
        print("已連接到印尼股票實時數據服務")
        self.connected = True
        
        # 重新訂閲之前訂閲的股票
        if self.subscriptions:
            self.subscribe(list(self.subscriptions))
    
    def on_message(self, ws, message):
        """接收消息回調"""
        try:
            data = json.loads(message)
            
            # 處理實時行情數據
            if "pid" in data:
                symbol = data.get("symbol", "Unknown")
                price = data.get("last_numeric", 0)
                change = data.get("pc", 0)
                change_pct = data.get("pcp", 0)
                
                print(f"實時行情 [{symbol}]: {price} ({change} / {change_pct}%)")
                
            # 處理心跳響應
            elif data.get("action") == "pong":
                pass
                
        except Exception as e:
            print(f"處理實時數據時出錯: {str(e)}")
    
    def on_error(self, ws, error):
        """錯誤處理回調"""
        print(f"WebSocket錯誤: {str(error)}")
    
    def on_close(self, ws, close_status_code, close_msg):
        """連接關閉回調"""
        print("WebSocket連接已關閉")
        self.connected = False
        
        # 嘗試重新連接
        print("嘗試重新連接...")
        time.sleep(3)
        self.connect()
    
    def subscribe(self, pids):
        """訂閲股票"""
        if not self.connected:
            print("未連接,無法訂閲")
            return False
        
        # 添加到訂閲列表
        self.subscriptions.update(pids)
        
        # 構造訂閲消息
        message = json.dumps({
            "action": "subscribe",
            "pids": list(pids)
        })
        
        # 發送訂閲請求
        self.ws.send(message)
        print(f"已訂閲: {', '.join(map(str, pids))}")
        return True
    
    def unsubscribe(self, pids):
        """取消訂閲股票"""
        if not self.connected:
            print("未連接,無法取消訂閲")
            return False
        
        # 從訂閲列表中移除
        for pid in pids:
            self.subscriptions.discard(pid)
        
        # 構造取消訂閲消息
        message = json.dumps({
            "action": "unsubscribe",
            "pids": list(pids)
        })
        
        # 發送取消訂閲請求
        self.ws.send(message)
        print(f"已取消訂閲: {', '.join(map(str, pids))}")
        return True
    
    def heartbeat(self):
        """心跳維護"""
        while True:
            if self.connected:
                try:
                    # 每30秒發送一次心跳
                    self.ws.send(json.dumps({"action": "ping"}))
                except Exception as e:
                    print(f"發送心跳失敗: {str(e)}")
            time.sleep(30)

# 使用示例
if __name__ == "__main__":
    # 創建WebSocket客户端
    ws_client = IDXWebSocketClient(API_KEY)
    
    # 訂閲股票(需要先獲取股票ID)
    time.sleep(2)  # 等待連接建立
    ws_client.subscribe([41602, 41605])  # 訂閲BBCA和BRIS
    
    # 保持主線程運行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("程序已終止")

高級功能實現

1. 數據緩存策略

from cachetools import TTLCache

class IDXDataCache:
    def __init__(self, maxsize=100, ttl=60):
        """初始化數據緩存"""
        self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
    
    def get_stock_data(self, symbol_or_id):
        """獲取股票數據(帶緩存)"""
        # 檢查緩存
        if symbol_or_id in self.cache:
            return self.cache[symbol_or_id]
        
        # 從API獲取
        data = get_stock_detail(symbol_or_id)
        
        # 更新緩存
        if data:
            self.cache[symbol_or_id] = data
        
        return data

# 使用示例
cache = IDXDataCache()
bbca_data = cache.get_stock_data("BBCA")

2. 實時數據處理器

class RealTimeDataProcessor:
    def __init__(self):
        self.data_buffer = {}
        self.batch_size = 10
        self.last_process_time = time.time()
    
    def add_data(self, symbol, data):
        """添加實時數據到緩衝區"""
        if symbol not in self.data_buffer:
            self.data_buffer[symbol] = []
        
        self.data_buffer[symbol].append(data)
        
        # 檢查是否達到批處理條件
        current_time = time.time()
        if (len(self.data_buffer[symbol]) >= self.batch_size or 
            current_time - self.last_process_time >= 1.0):
            self.process_data(symbol)
            self.last_process_time = current_time
    
    def process_data(self, symbol):
        """處理緩衝區的數據"""
        if symbol not in self.data_buffer or not self.data_buffer[symbol]:
            return
        
        data_points = self.data_buffer[symbol]
        
        # 計算統計指標
        prices = [d["last_numeric"] for d in data_points]
        volumes = [d.get("turnover_numeric", 0) for d in data_points]
        
        avg_price = sum(prices) / len(prices)
        max_price = max(prices)
        min_price = min(prices)
        total_volume = sum(volumes)
        
        print(f"\n{symbol} 實時數據統計 (最近 {len(data_points)} 個更新):")
        print(f"平均價格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")
        print(f"總成交量: {total_volume}")
        
        # 清空緩衝區
        self.data_buffer[symbol] = []

# 在WebSocket客户端的on_message方法中使用
processor = RealTimeDataProcessor()

def on_message(self, ws, message):
    try:
        data = json.loads(message)
        
        if "pid" in data:
            symbol = data.get("symbol", "Unknown")
            processor.add_data(symbol, data)
            
    except Exception as e:
        print(f"處理實時數據時出錯: {str(e)}")

3. 錯誤處理與重試機制

def api_request_with_retry(url, params, max_retries=3):
    """帶重試機制的API請求"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=10)
            if response.status_code == 200:
                data = response.json()
                if data.get("code") == 200:
                    return data
                elif data.get("code") == 429:  # 請求過多
                    retry_after = int(data.get("retryAfter", 30))
                    print(f"請求過於頻繁,等待 {retry_after} 秒後重試...")
                    time.sleep(retry_after)
                else:
                    print(f"API返回錯誤: {data.get('message')}")
            else:
                print(f"請求失敗,狀態碼: {response.status_code}")
        except Exception as e:
            print(f"請求異常: {str(e)}")
        
        # 指數退避等待
        wait_time = 2 ** attempt
        print(f"等待 {wait_time} 秒後重試 (嘗試 {attempt+1}/{max_retries})")
        time.sleep(wait_time)
    
    print(f"請求失敗,已達最大重試次數 {max_retries}")
    return None

完整示例應用

class IDXStockMonitor:
    def __init__(self, api_key):
        self.api_key = api_key
        self.ws_client = None
        self.data_cache = IDXDataCache()
        self.monitored_stocks = set()
        
    def start_monitoring(self, symbols):
        """開始監控指定股票"""
        print("開始監控印尼股票...")
        
        # 獲取股票ID
        stock_ids = []
        for symbol in symbols:
            stock_data = self.data_cache.get_stock_data(symbol)
            if stock_data:
                stock_ids.append(stock_data["id"])
                self.monitored_stocks.add(symbol)
                print(f"已添加監控: {symbol} (ID: {stock_data['id']})")
            else:
                print(f"無法獲取股票信息: {symbol}")
        
        # 啓動WebSocket連接
        if stock_ids:
            self.ws_client = IDXWebSocketClient(self.api_key)
            time.sleep(2)  # 等待連接建立
            self.ws_client.subscribe(stock_ids)
        
        # 啓動定期數據更新
        self.start_periodic_updates()
    
    def start_periodic_updates(self):
        """啓動定期數據更新"""
        def update_loop():
            while True:
                # 每5分鐘更新一次指數數據
                self.update_indices()
                
                # 每10分鐘更新一次股票列表
                if len(self.monitored_stocks) < 10:  # 只更新少量股票
                    self.update_stock_list()
                
                time.sleep(300)  # 5分鐘
        
        threading.Thread(target=update_loop, daemon=True).start()
    
    def update_indices(self):
        """更新指數數據"""
        print("\n更新印尼指數數據...")
        indices = get_indonesia_indices()
        for index in indices:
            print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
    
    def update_stock_list(self):
        """更新股票列表"""
        print("\n更新印尼股票列表...")
        stocks = get_indonesia_stocks(page_size=50)
        for stock in stocks[:10]:  # 只顯示前10只
            print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
    
    def run(self):
        """運行監控"""
        try:
            # 監控主要印尼股票
            symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]
            self.start_monitoring(symbols_to_monitor)
            
            # 保持主線程運行
            while True:
                time.sleep(1)
                
        except KeyboardInterrupt:
            print("\n監控已停止")
        except Exception as e:
            print(f"監控出錯: {str(e)}")

# 啓動監控
if __name__ == "__main__":
    monitor = IDXStockMonitor(API_KEY)
    monitor.run()

部署建議

  1. API密鑰管理:不要將API密鑰硬編碼在代碼中,使用環境變量或配置文件
  2. 錯誤處理:增加更完善的錯誤處理和日誌記錄
  3. 速率限制:遵守API的速率限制,避免頻繁請求
  4. 數據存儲:考慮將重要數據存儲到數據庫中以供後續分析
  5. 監控告警:設置價格波動告警機制
# 從環境變量獲取API密鑰
import os
API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")

以上是一個完整的印尼股票數據API對接實現方案。您可以根據實際需求進行調整和擴展。如果您需要更多特定功能或有任何問題,請隨時告訴我。

user avatar tianmiaogongzuoshi_5ca47d59bef41 Avatar smalike Avatar qingzhan Avatar aqiongbei Avatar huajianketang Avatar inslog Avatar xiaoxxuejishu Avatar zero_dev Avatar solvep Avatar zhulongxu Avatar ccVue Avatar yixiyidong Avatar
Favorites 73 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.