代碼實現多模型校準性能對比分析,原理為通過 LSTM/BiLSTM 捕捉時序特徵,傳統機器學習模型擬合數據分佈,利用 calibration_curve 評估模型預測概率與實際正例比例的一致性。實現方法為加載 / 生成標準化數據,訓練 / 加載 LSTM、BiLSTM、隨機森林等多類模型,生成預測概率後繪製校準曲線,量化對比不同模型的校準效果,為模型選擇與優化提供實驗依據。
(圖中數據均為隨機示例數據,以實際訓練結果為準)
01
—
模塊導入與參數配置段
導入數據分析、模型訓練、繪圖等庫,配置數據、模型、繪圖等參數,統一管理實驗配置。原理是模塊化管理依賴與參數,提升代碼可維護性。實現為 import 導入庫,定義常量和字典存儲各類參數。
import os
import json
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.calibration import calibration_curve
from sklearn.metrics import log_loss
import warnings
warnings.filterwarnings('ignore')
# ---------------------- 數據相關參數 ----------------------
DATA_FILE = "data(模型比較).xlsx"
FEATURE_NUM = 10
TARGET_COL = "target"
TEST_SIZE = 0.3
RANDOM_SEED = 42
TIME_STEPS = 5
# ---------------------- LSTM/BiLSTM模型參數 ----------------------
LSTM_UNITS = 64
BILSTM_UNITS = 64
BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 0.001
DROPOUT_RATE = 0.2
PATIENCE = 3
# ---------------------- 傳統模型網格搜索參數(最優模型搜索) ----------------------
RF_PARAM_GRID = {
'n_estimators': [50, 100],
'max_depth': [5, 10],
'min_samples_split': [2, 5]
}
SVM_PARAM_GRID = {
'C': [0.1, 1],
'gamma': ['scale', 'auto'],
'kernel': ['rbf']
}
LR_PARAM_GRID = {
'C': [0.1, 1, 10],
'penalty': ['l1', 'l2'],
'solver': ['liblinear']
}
# ---------------------- 繪圖參數 ----------------------
PLOT_CONFIG = {'bg_color': '#F5F5F5','diag_line': '#FF6347','xy_label': '#333333',
'fig_size': (10, 8),
'font_size': {
'title': 20,
'label': 18,
'tick': 14,
'legend': 12
},
'model_colors': {
'LSTM': '#1f77b4',
'RF': '#ff7f0e',
'BiLSTM': '#2ca02c',
'SVM': '#d62728',
'MLR': '#9467bd',
'邏輯迴歸': '#8c564b'
}}
# ---------------------- 模型文件夾配置 ----------------------
MODEL_FOLDERS = ["LSTM", "RF", "BiLSTM", "SVM", "MLR", "邏輯迴歸"]
MODEL_SUFFIX = {
'LSTM': '.h5',
'BiLSTM': '.h5',
'RF': '.pkl',
'SVM': '.pkl',
'MLR': '.pkl',
'邏輯迴歸': '.pkl'
}
PARAM_FILE_SUFFIX = '_params.json'
02
—
中文字體設置函數
配置 matplotlib 的字體參數,解決中文顯示異常和負號渲染問題。原理是修改 rcParams 配置項指定中文字體,設置字體渲染格式。實現為調整 font.sans-serif 等參數,指定兼容中文字體。
def setup_chinese_font():
plt.rcParams['text.usetex'] = False
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['pdf.fonttype'] = 42
plt.rcParams['ps.fonttype'] = 42
03
—
隨機數據生成與保存函數
生成指定維度的隨機特徵和二分類標籤,保存為 Excel 文件。原理是基於隨機種子生成正態分佈特徵和離散標籤,保證可復現。實現為 numpy 生成數據,pandas 拼接並保存為 Excel。
def generate_and_save_data(file_path, feature_num=10, sample_num=1000):
np.random.seed(RANDOM_SEED)
features = pd.DataFrame(
np.random.randn(sample_num, feature_num),
columns=[f"feature_{i + 1}" for i in range(feature_num)]
)
target = pd.Series(
np.random.randint(0, 2, size=sample_num),
name=TARGET_COL
)
df = pd.concat([features, target], axis=1)
df.to_excel(file_path, index=False, engine='openpyxl')
print(f"隨機數據已生成並保存到: {file_path}")
return df
04
—
數據加載 / 生成函數
檢查數據文件是否存在,存在則加載,否則調用生成函數創建。原理是文件系統存在性判斷,保證數據來源的一致性。實現為 os.path.exists 判斷,pandas 加載或調用生成函數。
def load_or_generate_data(file_path):
if os.path.exists(file_path):
df = pd.read_excel(file_path, engine='openpyxl')
print(f"成功加載數據文件: {file_path}")
else:
print(f"數據文件 {file_path} 不存在,正在生成隨機數據...")
df = generate_and_save_data(file_path, FEATURE_NUM)
return df
def create_model_folders(folders):
for folder in folders:
if not os.path.exists(folder):
os.makedirs(folder)
print(f"創建模型文件夾: {folder}")
05
—
LSTM 數據重塑函數
將二維特徵數據重塑為 LSTM 所需的三維時序格式。原理是滑動窗口截取固定長度的時序片段,適配 LSTM 輸入維度。實現為遍歷樣本生成 time_steps 長度的序列,轉換為 numpy 數組。
def reshape_for_lstm(data, time_steps):
n_samples = data.shape[0] - time_steps + 1
reshaped_data = []
for i in range(n_samples):
reshaped_data.append(data[i:i + time_steps, :])
return np.array(reshaped_data)
06
—
LSTM 模型訓練函數
構建 LSTM 模型,訓練並保存模型及參數。原理是 LSTM 捕捉時序特徵,Dropout 防止過擬合,Adam 優化二進制交叉熵損失。實現為 Sequential 構建模型,fit 訓練,save 保存模型,json 保存參數。
def train_lstm_model(X_train, y_train, model_path, params):
X_train_lstm = reshape_for_lstm(X_train, TIME_STEPS)
y_train_lstm = y_train[TIME_STEPS - 1:]
model = Sequential([
LSTM(params['units'], input_shape=(TIME_STEPS, FEATURE_NUM), return_sequences=False),
Dropout(params['dropout_rate']),
Dense(1, activatinotallow='sigmoid')
])
optimizer = Adam(learning_rate=params['lr'])
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(
X_train_lstm, y_train_lstm,
batch_size=params['batch_size'],
epochs=params['epochs'],
validation_split=0.1,
verbose=1
)
model.save(model_path)
with open(model_path.replace('.h5', PARAM_FILE_SUFFIX), 'w') as f:
json.dump(params, f, indent=4)
print(f"LSTM模型已保存到: {model_path}")
return model
07
—
BiLSTM 模型訓練函數
構建雙向 LSTM 模型,訓練並保存模型及參數。原理是 BiLSTM 雙向捕捉時序特徵,提升時序信息利用效率。實現為 Bidirectional 封裝 LSTM 層,其餘訓練和保存邏輯同 LSTM。
def train_bilstm_model(X_train, y_train, model_path, params):
X_train_bilstm = reshape_for_lstm(X_train, TIME_STEPS)
y_train_bilstm = y_train[TIME_STEPS - 1:]
model = Sequential([
Bidirectional(LSTM(params['units'], return_sequences=False)),
Dropout(params['dropout_rate']),
Dense(1, activatinotallow='sigmoid')
])
optimizer = Adam(learning_rate=params['lr'])
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(
X_train_bilstm, y_train_bilstm,
batch_size=params['batch_size'],
epochs=params['epochs'],
validation_split=0.1,
verbose=1
)
model.save(model_path)
with open(model_path.replace('.h5', PARAM_FILE_SUFFIX), 'w') as f:
json.dump(params, f, indent=4)
print(f"BiLSTM模型已保存到: {model_path}")
return model
08
—
傳統模型訓練函數
對傳統機器學習模型進行網格搜索,找到最優參數並保存模型和參數。原理是 GridSearchCV 交叉驗證遍歷參數組合,選擇最優模型。實現為分模型類型初始化基模型,網格搜索後保存最優模型和參數。
def train_traditional_model(X_train, y_train, model_type, model_path, param_grid):
if model_type == 'RF':
base_model = RandomForestClassifier(random_state=RANDOM_SEED)
elif model_type == 'SVM':
base_model = SVC(probability=True, random_state=RANDOM_SEED)
elif model_type == 'MLR':
base_model = LinearRegression()
elif model_type == '邏輯迴歸':
base_model = LogisticRegression(random_state=RANDOM_SEED, max_iter=1000)
else:
raise ValueError(f"不支持的模型類型: {model_type}")
if model_type != 'MLR':
grid_search = GridSearchCV(
estimator=base_model,
param_grid=param_grid,
cv=3,
scoring='neg_log_loss' if model_type != 'MLR' else 'neg_mean_squared_error',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
best_params = grid_search.best_params_
else:
base_model.fit(X_train, y_train)
best_model = base_model
best_params = {}
best_params['feature_num'] = FEATURE_NUM
best_params['random_seed'] = RANDOM_SEED
best_params['test_size'] = TEST_SIZE
joblib.dump(best_model, model_path)
with open(model_path.replace('.pkl', PARAM_FILE_SUFFIX), 'w') as f:
json.dump(best_params, f, indent=4)
print(f"{model_type}最優模型已保存到: {model_path}")
return best_model
09
—
模型加載 / 訓練函數
檢查模型文件是否存在,存在則加載,否則調用對應訓練函數訓練。原理是文件存在性判斷,避免重複訓練。實現為分模型類型加載 h5/pkl 文件,無則調用對應訓練函數。
def load_or_train_model(folder_name, X_train, y_train):
model_path = os.path.join(folder_name, f"best_{folder_name}{MODEL_SUFFIX[folder_name]}")
if os.path.exists(model_path):
if folder_name in ['LSTM', 'BiLSTM']:
model = load_model(model_path)
else:
model = joblib.load(model_path)
param_path = model_path.replace(MODEL_SUFFIX[folder_name], PARAM_FILE_SUFFIX)
with open(param_path, 'r') as f:
model_params = json.load(f)
print(f"成功加載{folder_name}模型: {model_path}")
else:
print(f"{folder_name}模型不存在,正在訓練最優模型...")
if folder_name == 'LSTM':
lstm_params = {
'units': LSTM_UNITS,
'batch_size': BATCH_SIZE,
'epochs': EPOCHS,
'lr': LEARNING_RATE,
'dropout_rate': DROPOUT_RATE,
'time_steps': TIME_STEPS,
'feature_num': FEATURE_NUM
}
model = train_lstm_model(X_train, y_train, model_path, lstm_params)
model_params = lstm_params
elif folder_name == 'BiLSTM':
bilstm_params = {
'units': BILSTM_UNITS,
'batch_size': BATCH_SIZE,
'epochs': EPOCHS,
'lr': LEARNING_RATE,
'dropout_rate': DROPOUT_RATE,
'time_steps': TIME_STEPS,
'feature_num': FEATURE_NUM
}
model = train_bilstm_model(X_train, y_train, model_path, bilstm_params)
model_params = bilstm_params
elif folder_name == 'RF':
model = train_traditional_model(X_train, y_train, 'RF', model_path, RF_PARAM_GRID)
param_path = model_path.replace('.pkl', PARAM_FILE_SUFFIX)
with open(param_path, 'r') as f:
model_params = json.load(f)
elif folder_name == 'SVM':
model = train_traditional_model(X_train, y_train, 'SVM', model_path, SVM_PARAM_GRID)
param_path = model_path.replace('.pkl', PARAM_FILE_SUFFIX)
with open(param_path, 'r') as f:
model_params = json.load(f)
elif folder_name == 'MLR':
model = train_traditional_model(X_train, y_train, 'MLR', model_path, {})
param_path = model_path.replace('.pkl', PARAM_FILE_SUFFIX)
with open(param_path, 'r') as f:
model_params = json.load(f)
elif folder_name == '邏輯迴歸':
model = train_traditional_model(X_train, y_train, '邏輯迴歸', model_path, LR_PARAM_GRID)
param_path = model_path.replace('.pkl', PARAM_FILE_SUFFIX)
with open(param_path, 'r') as f:
model_params = json.load(f)
return model, model_params
10
—
模型預測函數
根據模型類型生成預測概率,適配 LSTM/BiLSTM 的時序輸入和傳統模型格式。原理是不同模型輸入維度和輸出格式不同,需針對性處理。實現為重塑 LSTM 輸入,裁剪 MLR 預測值,提取分類模型正類概率。
def get_model_predictions(model, model_type, X_data):
if model_type in ['LSTM', 'BiLSTM']:
X_reshaped = reshape_for_lstm(X_data, TIME_STEPS)
preds = model.predict(X_reshaped, verbose=0).flatten()
preds = np.pad(preds, (TIME_STEPS - 1, 0), mode='constant', constant_values=0.5)
elif model_type == 'MLR':
preds = model.predict(X_data)
preds = np.clip(preds, 0, 1)
else:
preds = model.predict_proba(X_data)[:, 1]
return preds
11
—
校準曲線繪製函數
繪製多模型的校準曲線,對比預測概率與實際正例比例。原理是 calibration_curve 計算分箱後的均值概率和正例比例,評估模型校準度。實現為 matplotlib 繪圖,設置樣式並保存為 PDF/PNG 格式。
def draw_calibration_comparison(true_labels, model_prob_dict, plot_title, plot_config, n_bins=10, filename=None):
setup_chinese_font()
fig, ax = plt.subplots(figsize=plot_config['fig_size'])
ax.set_facecolor(plot_config['bg_color'])
ax.grid(True,
which='both',
linestyle='--',
linewidth=2,
color='lightgrey',
dashes=(3, 5))
ax.plot([0, 1], [0, 1], color=plot_config['diag_line'], lw=2, linestyle='--', label='Perfect Calibration')
for model_name, probs in model_prob_dict.items():
valid_mask = np.arange(len(probs)) >= (TIME_STEPS - 1) if model_name in ['LSTM', 'BiLSTM'] else np.ones(
len(probs), dtype=bool)
if np.sum(valid_mask) < n_bins:
continue
frac_pos, mean_pred = calibration_curve(
true_labels[valid_mask], probs[valid_mask], n_bins=n_bins, strategy='uniform'
)
ax.plot(mean_pred, frac_pos,
color=plot_config['model_colors'][model_name],
lw=2.5, label=model_name,
marker='o', markersize=6)
ax.set_xlim([-0.02, 1.02])
ax.set_ylim([-0.02, 1.02])
ax.set_xlabel('Mean Predicted Probability', fnotallow=plot_config['font_size']['label'],
color=plot_config['xy_label'])
ax.set_ylabel('Fraction of Positives', fnotallow=plot_config['font_size']['label'],
color=plot_config['xy_label'])
ax.set_title(plot_title, fnotallow=plot_config['font_size']['title'], loc='center', pad=20)
ax.xaxis.set_major_locator(plt.MultipleLocator(0.25))
ax.yaxis.set_major_locator(plt.MultipleLocator(0.25))
ax.tick_params(axis='both', which='major', labelsize=plot_config['font_size']['tick'])
ax.legend(loc='upper left', fnotallow=plot_config['font_size']['legend'],
framenotallow=True, edgecolor='black', framealpha=0.9)
for spine in ax.spines.values():
spine.set_edgecolor('
#6B8E23
')
spine.set_linewidth(2)
plt.tight_layout()
if filename:
plt.savefig(f"{filename}_calibration.pdf", dpi=300, bbox_inches='tight')
plt.savefig(f"{filename}_calibration.png", dpi=300, bbox_inches='tight')
print(f"校準曲線已保存為: {filename}_calibration.pdf/png")
plt.close(fig)
12
—
主函數執行段
串聯數據加載、預處理、模型訓練 / 加載、預測生成、校準曲線繪製的全流程。原理是遵循機器學習實驗流程,標準化數據並對比多模型校準性能。實現為調用各函數完成數據處理,訓練模型後生成預測並繪圖。
if __name__ == "__main__":
df = load_or_generate_data(DATA_FILE)
X = df.drop(columns=[TARGET_COL]).values
y = df[TARGET_COL].values
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RANDOM_SEED, stratify=y
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
joblib.dump(scaler, "scaler.pkl")
create_model_folders(MODEL_FOLDERS)
trained_models = {}
model_params_dict = {}
for folder in MODEL_FOLDERS:
model, params = load_or_train_model(folder, X_train_scaled, y_train)
trained_models[folder] = model
model_params_dict[folder] = params
print("\n========== 所有模型參數彙總 ==========")
for model_name, params in model_params_dict.items():
print(f"\n{model_name} 參數:")
for k, v in params.items():
print(f" {k}: {v}")
train_prob_dict = {}
val_prob_dict = {}
for model_name, model in trained_models.items():
train_probs = get_model_predictions(model, model_name, X_train_scaled)
train_prob_dict[model_name] = train_probs
val_probs = get_model_predictions(model, model_name, X_val_scaled)
val_prob_dict[model_name] = val_probs
draw_calibration_comparison(
true_labels=y_train,
model_prob_dict=train_prob_dict,
plot_title="Calibration Curves on Training Set (Multi-Model Comparison)",
plot_cnotallow=PLOT_CONFIG,
filename="training_set"
)
draw_calibration_comparison(
true_labels=y_val,
model_prob_dict=val_prob_dict,
plot_title="Calibration Curves on Validation Set (Multi-Model Comparison)",
plot_cnotallow=PLOT_CONFIG,
filename="validation_set"
)
print("\n所有校準曲線繪製完成!")