一般來説,時間序列像一條由兩部分組成的河流:一部分是“平穩、能用直線或簡單公式描述”的水流(線性成分),另一部分是“突然的涌動、彎曲和複雜模式”(非線性成分)。
ARIMA 擅長抓住那部分可以用線性自迴歸/移動平均解釋的“規矩”水流;隨機森林擅長從複雜、非線性的關係裏找模式。
把兩者“融合”起來,就是先用 ARIMA 把序列裏能用線性解釋的部分拿掉(得到殘差),再讓 RF 去學習並預測那些剩下的、比較複雜的部分;
最後把兩部分加起來作為最終預測。這樣通常比單用 ARIMA 或單用 RF 更穩健,尤其是當序列同時含有線性和非線性成分時。
👇 每一篇都是乾貨滿滿滿滿哦!~
核心原理
問題與記號:
已知歷史序列 。我們要對未來 步做預測,即求 (為預測 horizon)。
用 表示 ARIMA 給出的 -步預測;用 表示隨機森林對殘差(或直接對 )的 -步預測。融合後的預測記為 。
2. ARIMA(線性基礎)
用回退算子 ()表示。ARIMA() 的一般寫法:
其中 , , 為白噪聲, 為常數項。
若先做差 ,則 滿足 ARMA():
AR 多步遞推公式(直觀):
- 一步預測(AR(p)):
- 兩步及以上使用遞歸,把已預測值代入:
(其中當索引落在歷史時使用真實觀測值)。 - ARIMA 的預測方差可由模型的 MA(∞) 展開係數 得到(理論上):
其中 。
隨機森林(RF,非線性部分)
隨機森林本質上是 棵迴歸樹的集成。若第 棵樹預測為 ,則 RF 的預測是平均:
對迴歸問題,RF 逼近條件期望 。在殘差建模情景,我們令目標變量為殘差 ,輸入 為從歷史構造的特徵(滯後項、ARIMA 預測、節假日、滾動統計量等),RF 學習映射 。
混合思想
假設時間序列可以拆成線性與非線性兩部分(加噪聲):
其中 為 ARIMA 能解釋的線性部分, 為非線性剩餘。ARIMA 擬合 ,殘差為
用 RF 去擬合 (或直接擬合 ),得到殘差的預測 。最後的融合預測是:
這是最直接、常見且直觀的公式。
多步預測有幾種常見策略,混合模型裏要特別注意選擇:
A. 迭代策略
- 訓練 RF 或 ARIMA 做一步預測,然後遞歸使用預測作為未來的輸入來得到 步。
- 若用迭代方式,預測步驟類似:
- 計算 。
- 用 構造下一次需要的特徵,再預測 ,以此類推。
- 缺點:誤差累積(尤其 RF 的一步預測誤差被帶到下一步)。
B. 直接策略
- 對每個 horizon 單獨訓練一個模型,直接預測 或殘差 :
最終 。 - 優點:避免誤差累積;缺點:需要為每個 訓練模型,數據/計算成本較高。
C. MIMO策略
- 直接訓練一個多輸出的 RF(或其他模型)一次預測向量 。適合想要保留預測間依賴結構的情況。
- 矩陣表示:定義目標矩陣 ,訓練模型學習 。
損失函數
對殘差預測器(RF)通常以均方誤差(MSE)為訓練目標:
其中 為 RF (樹集合)函數類。經驗解由隨機森林算法給出(bootstrap + 隨機特徵選擇 + 樹迴歸平均)。
評估指標
常見的有:
均方根誤差(RMSE):
平均絕對誤差(MAE):
平均絕對百分比誤差(MAPE):
注意:會有問題
對稱平均絕對百分比誤差(sMAPE):
置信區間/不確定性(如何合成)
若假設ARIMA 與 RF 預測誤差相互獨立,可近似把方差相加得到融合方差:
然後近似構造區間:
實務中RF 的不確定性通常用分位數迴歸森林(Quantile Regression Forest)或bootstrap方法估計;ARIMA 的方差可由模型給出。兩個方差合併時若非獨立需謹慎。
實現步驟
訓練階段
- 用訓練序列 擬合 ARIMA(),得到模型參數與訓練內預測 (通常做 in-sample one-step)。
- 計算殘差序列:
- 構造 RF 的訓練樣本。按照所選 multi-step 策略構造輸入 (滯後項、ARIMA 輸出、外生變量等)與目標(例如 direct:目標為 ;迭代:目標為 )。
- 用時間序列 CV 調參並訓練 RF(或若 direct,為每個 訓練一個 RF_h)。
- (可選)在驗證集上學習組合權重 。
預測階段(以 direct/residual 堆疊為例)
- 用 ARIMA 生成 。
- 構造相應的特徵 (含最新滯後觀測與 ARIMA 預測等)。
- 用 RF(或各個 RF_h)預測殘差 。
- 合成最終預測:
完整案例
我們用虛擬合成的時間序列(包含趨勢、季節性、線性 AR 成分、非線性成分、噪聲),先用 PyTorch 實現一個線性自迴歸(AR)模型 來捕捉線性部分(近似 ARIMA 的線性成分),得到 AR 預測並計算殘差;
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error
from statsmodels.tsa.arima.model import ARIMA
import warnings
warnings.filterwarnings("ignore")
USE_PYTORCH = True
try:
import torch
import torch.nn as nn
import torch.optim as optim
except Exception as e:
print("PyTorch import failed or not available. Will fall back to Ridge combiner. Error:", e)
USE_PYTORCH = False
# 1. 生成合成數據
def generate_synthetic_series(N=1200, seed=42):
np.random.seed(seed)
t = np.arange(N)
# 線性 AR-like 成分
phi1, phi2 = 0.6, -0.25
linear = np.zeros(N)
for i in range(2, N):
linear[i] = 1.5 + phi1 * linear[i-1] + phi2 * linear[i-2]
# 季節
seasonal = 3.0 * np.sin(2*np.pi*t/7)
# 非線性複雜項
nonlinear = 2.0*np.sin(2*np.pi*t/30) + 1.5*np.tanh(0.02*(t-400))*np.exp(-0.001*(t-400)**2)
# 異方差噪聲
noise = np.random.randn(N) * (0.5 + 0.005*(t % 50))
y = linear + seasonal + nonlinear + noise
dates = pd.date_range("2020-01-01", periods=N, freq='D')
df = pd.DataFrame({'y': y, 'linear': linear, 'seasonal': seasonal, 'nonlinear': nonlinear}, index=dates)
return df
# 2. 時間切分(嚴格按時間,避免泄露)
def time_split(df, train_end=900, val_end=1030):
# indices: [0:train_end) training, [train_end:val_end) validation, [val_end:] test
train = df.iloc[:train_end].copy()
val = df.iloc[train_end:val_end].copy()
test = df.iloc[val_end:].copy()
return train, val, test
# 3. ARIMA 擬合(僅在訓練集)並生成訓練內擬合值(用於殘差)
def fit_arima_and_residual(train_series, order=(2,0,1)):
model = ARIMA(train_series, order=order).fit()
fitted = model.fittedvalues # 訓練內 one-step 擬合(不使用未來數據)
residual = train_series - fitted
return model, fitted, residual
# 4. 構造特徵(只用滯後與滾動統計 + 日期特徵,所有特徵僅基於 t 時刻及之前)
def make_time_features(df, lags=24):
X = pd.DataFrame(index=df.index)
for lag in range(1, lags+1):
X[f'lag_{lag}'] = df['y'].shift(lag)
X['roll_mean_7'] = df['y'].rolling(window=7, min_periods=1).mean().shift(1)
X['roll_std_7'] = df['y'].rolling(window=7, min_periods=1).std().shift(1).fillna(0)
X['dayofweek'] = df.index.dayofweek
X['day_sin'] = np.sin(2*np.pi*df.index.dayofyear/365.25)
X['day_cos'] = np.cos(2*np.pi*df.index.dayofyear/365.25)
return X
# prepare direct supervised data for RF:
# at time t (we use features X_t), target is y_{t+h} (or residual_{t+h} when training residual models)
def prepare_direct_dataset(full_X, full_y, start_i, end_i, H, lags):
# start_i,end_i are integer index bounds in the full series
X_list = {h: [] for h in range(1, H+1)}
y_list = {h: [] for h in range(1, H+1)}
idx_list = {h: [] for h in range(1, H+1)}
for t_i in range(start_i + lags, end_i - H):
X_t = full_X.iloc[t_i].values
for h in range(1, H+1):
target = full_y.iloc[t_i + h]
X_list[h].append(X_t)
y_list[h].append(target)
idx_list[h].append(full_X.index[t_i])
# to numpy
for h in range(1, H+1):
if len(X_list[h]) > 0:
X_list[h] = np.vstack(X_list[h])
y_list[h] = np.array(y_list[h])
else:
X_list[h] = np.empty((0, full_X.shape[1]))
y_list[h] = np.array([])
return X_list, y_list, idx_list
# 5. 訓練 RF(direct)去預測殘差(在 train 區)
def train_rf_residuals(full_X, df, train_end, H, lags, arima_fitted_train):
# residual series: r_t = y_t - arima_fitted_t (only exists for t in train)
residual_series = pd.Series(index=df.index, dtype=float)
# align fittedvalues index with train index
residual_series.iloc[:train_end] = (df['y'].iloc[:train_end] - arima_fitted_train).values
X_train_list, y_train_list, idx_train_list = prepare_direct_dataset(full_X, df['y'], 0, train_end, H, lags)
y_resid_train_list = {}
for h in range(1, H+1):
idxs = idx_train_list[h]
y_resid = []
valid_rows = []
for i, idx in enumerate(idxs):
t_idx = df.index.get_loc(idx) + h
if t_idx < train_end:
res_val = residual_series.iloc[t_idx]
ifnot np.isnan(res_val):
y_resid.append(res_val)
valid_rows.append(i)
if len(valid_rows) == 0:
X_valid = np.empty((0, full_X.shape[1]))
else:
X_valid = X_train_list[h][valid_rows]
y_resid_train_list[h] = (X_valid, np.array(y_resid))
rf_models = {}
rf_params = {'n_estimators': 100, 'max_depth': 8, 'random_state': 42, 'n_jobs': -1}
for h in range(1, H+1):
X_h, y_h = y_resid_train_list[h]
if X_h.shape[0] < 8:
rf_models[h] = None
continue
rf = RandomForestRegressor(**rf_params)
rf.fit(X_h, y_h)
rf_models[h] = rf
return rf_models
# 6. 用 validation 構造 ARIMA/RF 對並訓練 PyTorch combiner(或 Ridge 退化)
def train_combiner_on_validation(arima_model_train, rf_models, full_X, df, train_end, val_end, H, lags, use_pytorch=True):
# 為 validation 時刻 t (train_end <= t < val_end - H) 構造 (arima_h_pred, rf_h_pred, y_{t+h})
X_val_list, y_val_list, idx_val_list = prepare_direct_dataset(full_X, df['y'], train_end, val_end, H, lags)
pairs = []
for h in range(1, H+1):
X_h = X_val_list[h]
y_h = y_val_list[h]
idxs = idx_val_list[h]
if X_h.shape[0] == 0:
continue
# RF preds for these X_h
rf_preds = np.zeros(len(y_h))
if rf_models.get(h) isnotNone:
rf_preds = rf_models[h].predict(X_h)
# ARIMA preds: use ARIMA fitted on train to produce h-step forecast (from end of train)
arima_h_pred = float(arima_model_train.forecast(steps=h).iloc[-1])
arima_preds = np.array([arima_h_pred] * len(y_h))
for i in range(len(y_h)):
pairs.append((arima_preds[i], rf_preds[i], y_h[i], idxs[i], h))
if len(pairs) == 0:
# no data in validation to train combiner; fallback to simple sum combiner
returnNone, None, None
comb_X = np.array([[p[0], p[1]] for p in pairs], dtype=np.float32)
comb_y = np.array([p[2] for p in pairs], dtype=np.float32).reshape(-1,1)
mean_X = comb_X.mean(axis=0, keepdims=True)
std_X = comb_X.std(axis=0, keepdims=True) + 1e-8
comb_X_norm = (comb_X - mean_X) / std_X
if use_pytorch:
try:
# Build simple PyTorch net
class CombinerNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(2, 16),
nn.ReLU(),
nn.Linear(16, 8),
nn.ReLU(),
nn.Linear(8, 1)
)
def forward(self, x):
return self.net(x)
model = CombinerNet()
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
X_tensor = torch.tensor(comb_X_norm, dtype=torch.float32)
y_tensor = torch.tensor(comb_y, dtype=torch.float32)
dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor)
loader = torch.utils.data.DataLoader(dataset, batch_size=16, shuffle=True)
EPOCHS = 120
model.train()
for epoch in range(EPOCHS):
total_loss = 0.0
for Xb, yb in loader:
optimizer.zero_grad()
pred = model(Xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
total_loss += loss.item() * Xb.size(0)
if (epoch+1) % 40 == 0:
print(f"[Combiner PyTorch] epoch {epoch+1}/{EPOCHS}, loss={total_loss/len(dataset):.6f}")
return model, mean_X, std_X
except Exception as e:
print("PyTorch combiner training failed, falling back to Ridge. Error:", e)
use_pytorch = False
# fallback: Ridge regression (線性可解釋組合)
ridge = Ridge(alpha=1.0)
ridge.fit(comb_X, comb_y.ravel())
return ridge, mean_X, std_X
# 7. 預測與評估(在 test 前,會先用 train+val 重新訓練 ARIMA 和 RF)
def retrain_on_trainval_and_forecast(df, train_end, val_end, H, lags, rf_params=None, arima_order=(2,0,1),
combiner=None, comb_mean=None, comb_std=None):
# retrain ARIMA on train+val
trainval = df.iloc[:val_end].copy()
arima_tv = ARIMA(trainval['y'], order=arima_order).fit()
# recompute residuals on train+val
fitted_tv = arima_tv.fittedvalues
trainval['resid'] = trainval['y'] - fitted_tv
# retrain RF on train+val residuals
full_X = make_time_features(df, lags=lags)
X_tv_list, y_tv_list, idx_tv_list = prepare_direct_dataset(full_X, df['y'], 0, val_end, H, lags)
y_resid_tv_list = {}
for h in range(1, H+1):
idxs = idx_tv_list[h]
y_resid = []
valid_rows = []
for i, idx in enumerate(idxs):
t_idx = df.index.get_loc(idx) + h
if t_idx < val_end:
res_val = trainval['resid'].iloc[t_idx] if idx in trainval.index else np.nan
ifnot np.isnan(res_val):
y_resid.append(res_val)
valid_rows.append(i)
if len(valid_rows) == 0:
X_valid = np.empty((0, full_X.shape[1]))
else:
X_valid = X_tv_list[h][valid_rows]
y_resid_tv_list[h] = (X_valid, np.array(y_resid))
rf_models_tv = {}
if rf_params isNone:
rf_params = {'n_estimators': 100, 'max_depth': 8, 'random_state': 42, 'n_jobs': -1}
for h in range(1, H+1):
X_h, y_h = y_resid_tv_list[h]
if X_h.shape[0] < 8:
rf_models_tv[h] = None
continue
rf = RandomForestRegressor(**rf_params)
rf.fit(X_h, y_h)
rf_models_tv[h] = rf
# Forecast from origin = last index of train+val (val_end - 1)
origin_idx = val_end - 1
full_X = make_time_features(df, lags=lags)
X_origin = full_X.iloc[origin_idx].values.reshape(1, -1)
arima_fore = arima_tv.forecast(steps=H)
preds = {'h': [], 'arima': [], 'rf_resid': [], 'sum': [], 'combiner': [], 'true': []}
for h in range(1, H+1):
arima_h = float(arima_fore[h-1])
rf_h = float(rf_models_tv[h].predict(X_origin)[0]) if rf_models_tv.get(h) isnotNoneelse0.0
sum_pred = arima_h + rf_h
if combiner isnotNone:
# prepare combiner input (normalize using comb_mean/comb_std)
# For sklearn Ridge combiner: combiner expects raw (not normalized) inputs; we trained it on raw comb_X
if USE_PYTORCH and hasattr(combiner, 'net'):
comb_in = (np.array([arima_h, rf_h], dtype=np.float32).reshape(1,2) - comb_mean) / comb_std
import torch
comb_in_t = torch.tensor(comb_in, dtype=torch.float32)
combiner.eval()
with torch.no_grad():
comb_pred = float(combiner(comb_in_t).numpy().reshape(-1)[0])
else:
# Ridge expects raw inputs
comb_pred = float(combiner.predict(np.array([[arima_h, rf_h]]))[0])
else:
comb_pred = sum_pred
true_idx = origin_idx + h
true_val = df['y'].iloc[true_idx] if true_idx < len(df) else np.nan
preds['h'].append(h)
preds['arima'].append(arima_h)
preds['rf_resid'].append(rf_h)
preds['sum'].append(sum_pred)
preds['combiner'].append(comb_pred)
preds['true'].append(true_val)
results = pd.DataFrame(preds)
mask = ~results['true'].isna()
def metrics(y_true, y_pred):
return {'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)), 'MAE': mean_absolute_error(y_true, y_pred)}
metrics_arima = metrics(results.loc[mask, 'true'], results.loc[mask, 'arima'])
metrics_sum = metrics(results.loc[mask, 'true'], results.loc[mask, 'sum'])
metrics_comb = metrics(results.loc[mask, 'true'], results.loc[mask, 'combiner'])
return results, (metrics_arima, metrics_sum, metrics_comb), arima_tv, rf_models_tv
# 8. 主流程:生成數據、訓練、繪圖
def main_demo():
# 生成數據
df = generate_synthetic_series(N=1200, seed=2025)
train_end = 900
val_end = 1030
H = 12
lags = 24
# time split
train, val, test = time_split(df, train_end=train_end, val_end=val_end)
# fit ARIMA on train and compute residuals
arima_model, arima_fitted, residual = fit_arima_and_residual(train['y'], order=(2,0,1))
train['arima_fitted'] = arima_fitted
train['residual'] = residual
# features
full_X = make_time_features(df, lags=lags)
# train RF on residuals (train only)
rf_models = train_rf_residuals(full_X, df, train_end, H, lags, arima_fitted)
# train combiner on validation; if PyTorch available we try PyTorch, else Ridge
combiner, comb_mean, comb_std = train_combiner_on_validation(arima_model, rf_models, full_X, df, train_end, val_end, H, lags, use_pytorch=USE_PYTORCH)
# retrain on train+val and forecast on test-origin
results, metrics, arima_tv, rf_models_tv = retrain_on_trainval_and_forecast(df, train_end, val_end, H, lags, rf_params=None, arima_order=(2,0,1), combiner=combiner, comb_mean=comb_mean, comb_std=comb_std)
print("\nEvaluation (origin = last point of train+val):")
print("ARIMA-only:", metrics[0])
print("ARIMA+RF (sum):", metrics[1])
print("ARIMA+RF (combiner):", metrics[2])
#---------- 繪圖(至少 6 個圖)----------
plt.rcParams['figure.figsize'] = (12,6)
# 圖1: 完整時間序列,並標註 train/val/test
fig1, ax1 = plt.subplots()
ax1.plot(df.index, df['y'], label='Observed (y)', color='crimson', linewidth=1.1)
ax1.plot(train.index, train['arima_fitted'], label='ARIMA fitted (train)', color='navy', linewidth=1.0)
ax1.axvspan(df.index[0], df.index[train_end-1], alpha=0.12, color='gold', label='Train region')
ax1.axvspan(df.index[train_end], df.index[val_end-1], alpha=0.10, color='orchid', label='Validation region')
ax1.axvspan(df.index[val_end], df.index[-1], alpha=0.08, color='lightgreen', label='Test region')
ax1.set_title("圖 1:完整時間序列與訓練集內 ARIMA 擬合")
ax1.set_ylabel("y")
ax1.legend(); ax1.grid(alpha=0.2)
# 圖2: 真實構成部分 vs ARIMA 擬合(show linear/seasonal/nonlinear)
fig2, ax2 = plt.subplots()
ax2.plot(df.index, df['linear'], label='True linear', color='darkorange', linewidth=1.2)
ax2.plot(df.index, df['seasonal'], label='Seasonal', color='teal', linewidth=1.0, linestyle='--')
ax2.plot(df.index, df['nonlinear'], label='True nonlinear', color='purple', linewidth=1.0, linestyle=':')
ax2.plot(train.index, train['arima_fitted'], label='ARIMA fitted (train)', color='navy', linewidth=1.0)
ax2.set_title("圖 2:真實線性/季節/非線性分量 與 ARIMA 的擬合能力對比")
ax2.legend(); ax2.grid(alpha=0.15)
# 圖3: multi-step predictions vs true (per horizon)
fig3, ax3 = plt.subplots()
ax3.plot(results['h'], results['true'], label='True', marker='o', color='black')
ax3.plot(results['h'], results['arima'], label='ARIMA', marker='s', color='navy')
ax3.plot(results['h'], results['sum'], label='ARIMA + RF (sum)', marker='D', color='crimson')
ax3.plot(results['h'], results['combiner'], label='Combiner', marker='^', color='limegreen')
ax3.set_xlabel('h (forecast horizon)'); ax3.set_ylabel('value')
ax3.set_title("圖 3:多步預測(h=1..H)對比:True / ARIMA / ARIMA+RF / Combiner")
ax3.legend(); ax3.grid(alpha=0.2)
# 圖4: 各步長殘差(true - pred),看偏差與誤差隨步長增長
fig4, ax4 = plt.subplots()
ax4.plot(results['h'], results['true'] - results['arima'], label='Error ARIMA', marker='o', color='navy')
ax4.plot(results['h'], results['true'] - results['sum'], label='Error ARIMA+RF(sum)', marker='D', color='crimson')
ax4.plot(results['h'], results['true'] - results['combiner'], label='Error Combiner', marker='^', color='limegreen')
ax4.axhline(0, color='gray', linestyle='--')
ax4.set_xlabel('h'); ax4.set_ylabel('error (true - pred)'); ax4.set_title("圖 4:殘差隨步長變化")
ax4.legend(); ax4.grid(alpha=0.2)
# 圖5: RF 特徵重要性(選一個 h 展示)
chosen_h = 3
rf_chosen = rf_models_tv.get(chosen_h, None)
fig5, ax5 = plt.subplots()
if rf_chosen isnotNone:
importances = rf_chosen.feature_importances_
feat_names = full_X.columns.tolist()
# show top 12 features
idxs = np.argsort(importances)[-12:][::-1]
ax5.barh([feat_names[i] for i in idxs][::-1], importances[idxs][::-1], color='magenta')
ax5.set_title(f"圖 5:RF (h={chosen_h}) 特徵重要性(前12)")
ax5.set_xlabel("importance")
else:
ax5.text(0.1, 0.5, f"No RF trained for h={chosen_h}", fontsize=12)
ax5.grid(alpha=0.15, axis='x')
# 圖6: Combiner 預測 vs 真實(散點圖)
fig6, ax6 = plt.subplots()
mask = ~results['true'].isna()
ax6.scatter(results.loc[mask,'true'], results.loc[mask,'combiner'], s=90, c='crimson', edgecolor='k', alpha=0.9)
mn = min(results.loc[mask,'true'].min(), results.loc[mask,'combiner'].min())
mx = max(results.loc[mask,'true'].max(), results.loc[mask,'combiner'].max())
ax6.plot([mn,mx],[mn,mx], linestyle='--', color='navy')
ax6.set_xlabel('True'); ax6.set_ylabel('Combiner Pred'); ax6.set_title("圖 6:Combiner 預測 vs 真實(散點)")
ax6.grid(alpha=0.2)
plt.show()
# 打印 per-horizon 表 (便於直觀比較)
display_df = results.copy()
display_df['abs_err_arima'] = np.abs(display_df['true'] - display_df['arima'])
display_df['abs_err_sum'] = np.abs(display_df['true'] - display_df['sum'])
display_df['abs_err_comb'] = np.abs(display_df['true'] - display_df['combiner'])
print("\nPer-horizon comparison (h, arima, rf_resid, sum, combiner, true, abs_errs):")
print(display_df[['h','arima','rf_resid','sum','combiner','true','abs_err_arima','abs_err_sum','abs_err_comb']])
if __name__ == "__main__":
main_demo()
- 合成數據
linear:一個 AR(2)-like 的線性成分,用以模擬 ARIMA 擅長擬合的部分。seasonal:短週期季節(週期7),類似周效果。nonlinear:緩慢變化的週期 + 局部突變(用 tanh 和 Gaussian 衰減),製造非線性結構,讓 RF/非線性模型能發揮作用。noise:異方差噪聲(幅度隨時間輕微變化),增加現實感。
- 時間劃分 & 避免數據泄露
-
嚴格按時間劃分成 train / val / test。
-
構造特徵時只用
y的滯後(lag_1到lag_L)以及滾動均值/方差,這些特徵在時間點 t 僅依賴於 t 或之前的信息(shift 保證不會用到未來)。 -
在訓練 RF 預測殘差時,目標是 residual_{t+h},而樣本的特徵僅來自 t。對於訓練樣本的採集,確保
t+h仍在訓練區間內才能被用來訓練 RF;這點在函數prepare_direct_dataset/train_rf_residuals中嚴格檢查。
-
ARIMA(線性基底)
-
在訓練集上擬合 ARIMA,只用訓練數據(不會泄露驗證或測試信息)。
-
使用訓練集內的一步擬合值
fittedvalues來構造殘差序列,這些殘差即是 RF 要學習的目標(“ARIMA 去掉線性後的剩餘”)。
-
Random Forest(非線性部分)
-
使用 direct 策略:為每個 horizon 單獨訓練一個 RF(即 RF_h 預測殘差 r_{t+h}),避免迭代預測中的誤差累積。
-
RF 的輸入只包括滯後、滾動統計、日曆特徵(全部在時刻 t 可用)。
-
Combiner(PyTorch / Ridge)
-
在 validation 上構造
(arima_pred_h, rf_pred_h) -> y_true的配對樣本,用來學習一個小模型(combiner)把 ARIMA 與 RF 的輸出組合成更準的最終預測。 -
我們嘗試優先用 PyTorch 訓練一個小的 MLP(非線性組合),若環境不支持則回退到 Ridge(線性迴歸,帶正則化)。
-
Combiner 只用 validation 數據訓練(相當於調參/校準),最後在做最終預測前,我們將 ARIMA 與 RF 在 train+val 上重新訓練(擴充數據),但不在此階段重新訓練 combiner(避免泄露 test 信息)。
-
最終預測策略
-
在
retrain_on_trainval_and_forecast中:先把 ARIMA 在 train+val 上重訓練,再在同一區間訓練 RF_h(以便在實際部署時儘量利用更多數據);預測 origin 取為 train+val 的最後一個點,然後一次性輸出 horizons 1..H 的預測(direct 多步)。 -
這樣符合實際工程做法:訓練/調參在 train/val;最後在 train+val 上訓練最終模型並在 test(未見)上評估。
數據可視化
圖 1:完整時間序列 + 訓練區間內 ARIMA 擬合
展示全序列與 ARIMA 在訓練集內的擬合效果。ARIMA 應該能抓住長期線性趨勢與一部分季節成分,但通常無法完全跟蹤複雜非線性波動。通過圖可以直觀判斷 ARIMA 與真實值的差異在哪裏。
圖 2:真實組成部分(linear / seasonal / nonlinear)對比 ARIMA 擬合
因為我們是用虛擬數據,知道真實分量。此圖把“ARIMA 擅長擬合的線性部分”與真實線性、季節及非線性項對比,幫助理解為什麼還需要非線性模型(RF)來補充。
圖 3:多步預測對比(h=1..H):True / ARIMA / ARIMA+RF(sum) / Combiner
逐步比較不同方法隨步長的預測值。直觀看出:在某些步長上,ARIMA+RF(殘差堆疊)會明顯優於 ARIMA;Combiner(如果訓練得好)通常能進一步微調輸出,減少偏差。這個圖能讓你快速判斷哪種方法在短期或中期表現更好。
圖 4:殘差隨步長變化(真實 - 預測)
殘差曲線幫我們看預測是系統性偏高還是偏低(偏差),以及誤差隨步長增長是否擴散(是否存在誤差累積問題)。若 RF/Combiner 有助於減小偏差,圖上會看到殘差更靠近 0。
圖 5:RF 特徵重要性(某個選定步長 h)
顯示 RF 在該步長下最依賴哪些特徵(尤其是滯後幾階)。通常,我們會看到最近的若干滯後(lag_1, lag_2, lag_7 等)重要性高,這驗證 RF 利用滯後信息捕捉非線性模式。對特徵工程優化很有幫助。
圖 6:Combiner 預測 vs 真實(散點圖)
檢視 Combiner 預測的整體偏差和離散性。若點聚集在對角線附近,説明模型擬合良好。若偏離出現系統