概述
特徵工程是機器學習項目中決定成敗的關鍵環節,高質量的特徵能夠顯著提升模型性能。本文系統介紹2025年特徵工程的最新技術和實踐方法,涵蓋從基礎數據預處理到高級特徵生成的完整流程。
特徵工程基礎概念
特徵類型與處理方法
|
特徵類型
|
數據形式
|
典型處理方法
|
注意事項
|
|
數值型特徵
|
連續數值
|
標準化、歸一化、離散化
|
注意異常值和分佈形態
|
|
類別型特徵
|
離散標籤
|
One-Hot編碼、標籤編碼、目標編碼
|
處理高基數類別和未知類別
|
|
時間型特徵
|
時間戳
|
週期編碼、時間差計算
|
考慮時區、節假日效應
|
|
文本型特徵
|
字符串
|
TF-IDF、詞嵌入、主題模型
|
處理多語言和特殊字符
|
|
空間型特徵
|
座標數據
|
地理編碼、距離計算
|
考慮投影和精度問題
|
特徵工程工作流
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif
import warnings
warnings.filterwarnings('ignore')
class FeatureEngineeringPipeline:
"""特徵工程基礎管道"""
def __init__(self):
self.numeric_scaler = StandardScaler()
self.categorical_encoder = LabelEncoder()
self.feature_selector = None
def load_sample_data(self):
"""加載示例數據"""
np.random.seed(42)
n_samples = 1000
data = {
'age': np.random.normal(35, 10, n_samples),
'income': np.random.lognormal(10, 1, n_samples),
'education': np.random.choice(['高中', '本科', '碩士', '博士'], n_samples),
'city': np.random.choice(['北京', '上海', '廣州', '深圳', '其他'], n_samples),
'purchase_amount': np.random.exponential(100, n_samples),
'target': np.random.choice([0, 1], n_samples, p=[0.7, 0.3])
}
return pd.DataFrame(data)
def analyze_features(self, df):
"""特徵分析報告"""
print("=== 特徵分析報告 ===")
print(f"數據集形狀: {df.shape}")
print("\n數據類型分佈:")
print(df.dtypes.value_counts())
print("\n缺失值統計:")
missing_stats = df.isnull().sum()
print(missing_stats[missing_stats > 0])
print("\n數值特徵統計:")
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
print(df[numeric_cols].describe())
return df
# 初始化管道
pipeline = FeatureEngineeringPipeline()
sample_data = pipeline.load_sample_data()
pipeline.analyze_features(sample_data)
數值型特徵處理
數據標準化與歸一化
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from scipy import stats
class NumericFeatureProcessor:
"""數值型特徵處理器"""
def __init__(self):
self.scalers = {}
self.transform_methods = {}
def detect_outliers(self, series, method='iqr'):
"""異常值檢測"""
if method == 'iqr':
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = (series < lower_bound) | (series > upper_bound)
elif method == 'zscore':
z_scores = np.abs(stats.zscore(series))
outliers = z_scores > 3
else:
raise ValueError("不支持的異常值檢測方法")
return outliers
def handle_outliers(self, series, method='clip'):
"""異常值處理"""
outliers = self.detect_outliers(series)
if method == 'clip':
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return series.clip(lower_bound, upper_bound)
elif method == 'remove':
return series[~outliers]
elif method == 'transform':
return np.log1p(series)
else:
return series
def scale_features(self, df, columns, method='standard'):
"""特徵縮放"""
scaled_df = df.copy()
for col in columns:
if method == 'standard':
scaler = StandardScaler()
elif method == 'minmax':
scaler = MinMaxScaler()
elif method == 'robust':
scaler = RobustScaler()
else:
raise ValueError("不支持的縮放方法")
scaled_df[col] = scaler.fit_transform(df[[col]])
self.scalers[col] = scaler
return scaled_df
def create_interaction_features(self, df, feature_pairs):
"""創建交互特徵"""
interaction_df = df.copy()
for col1, col2 in feature_pairs:
if col1 in df.columns and col2 in df.columns:
# 乘法交互
interaction_df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
# 除法交互(避免除零)
interaction_df[f'{col1}_div_{col2}'] = np.where(
df[col2] != 0, df[col1] / df[col2], 0
)
return interaction_df
def create_polynomial_features(self, df, columns, degree=2):
"""創建多項式特徵"""
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=degree, include_bias=False)
poly_features = poly.fit_transform(df[columns])
# 創建特徵名稱
feature_names = poly.get_feature_names_out(columns)
poly_df = pd.DataFrame(poly_features, columns=feature_names, index=df.index)
return pd.concat([df, poly_df], axis=1)
# 數值特徵處理示例
numeric_processor = NumericFeatureProcessor()
# 處理異常值
sample_data['income_cleaned'] = numeric_processor.handle_outliers(sample_data['income'])
# 特徵縮放
scaled_data = numeric_processor.scale_features(sample_data, ['age', 'income_cleaned'])
print("數值特徵處理完成")
print(f"處理前收入範圍: {sample_data['income'].min():.2f} - {sample_data['income'].max():.2f}")
print(f"處理後收入範圍: {scaled_data['income_cleaned'].min():.2f} - {scaled_data['income_cleaned'].max():.2f}")
分箱與離散化
class BinningProcessor:
"""分箱處理器"""
def equal_width_binning(self, series, n_bins=5):
"""等寬分箱"""
bins = pd.cut(series, bins=n_bins, labels=False)
return bins
def equal_frequency_binning(self, series, n_bins=5):
"""等頻分箱"""
bins = pd.qcut(series, q=n_bins, labels=False, duplicates='drop')
return bins
def target_guided_binning(self, series, target, n_bins=5):
"""目標引導分箱"""
# 基於目標變量的統計進行分箱
temp_df = pd.DataFrame({'feature': series, 'target': target})
temp_df = temp_df.sort_values('feature')
# 計算每個分箱的目標統計
temp_df['bin'] = pd.qcut(temp_df['feature'], q=n_bins, labels=False, duplicates='drop')
bin_stats = temp_df.groupby('bin')['target'].mean()
return temp_df['bin'].values
def create_binning_features(self, df, numeric_columns, target_col=None, n_bins=5):
"""創建分箱特徵"""
binning_df = df.copy()
binner = BinningProcessor()
for col in numeric_columns:
if col in df.columns:
# 等寬分箱
binning_df[f'{col}_eq_width'] = binner.equal_width_binning(df[col], n_bins)
# 等頻分箱
binning_df[f'{col}_eq_freq'] = binner.equal_frequency_binning(df[col], n_bins)
# 如果有目標變量,使用目標引導分箱
if target_col and target_col in df.columns:
binning_df[f'{col}_target_guide'] = binner.target_guided_binning(
df[col], df[target_col], n_bins
)
return binning_df
# 分箱示例
binning_processor = BinningProcessor()
binned_data = binning_processor.create_binning_features(
sample_data, ['age', 'income'], 'target', n_bins=4
)
print("\n分箱特徵創建完成")
print("年齡分箱分佈:")
print(binned_data['age_eq_width'].value_counts().sort_index())
類別型特徵編碼
多種編碼技術對比
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from category_encoders import TargetEncoder, CountEncoder
import category_encoders as ce
class CategoricalFeatureEncoder:
"""類別型特徵編碼器"""
def __init__(self):
self.encoders = {}
self.encoding_methods = {}
def one_hot_encoding(self, df, columns, max_categories=50):
"""One-Hot編碼"""
encoded_df = df.copy()
for col in columns:
if col in df.columns:
# 檢查類別數量
unique_count = df[col].nunique()
if unique_count > max_categories:
print(f"警告: {col}有{unique_count}個類別,跳過One-Hot編碼")
continue
encoder = OneHotEncoder(sparse_output=False, drop='first')
encoded_features = encoder.fit_transform(df[[col]])
# 創建特徵名稱
feature_names = [f"{col}_{cat}" for cat in encoder.categories_[0][1:]]
encoded_features_df = pd.DataFrame(
encoded_features, columns=feature_names, index=df.index
)
encoded_df = pd.concat([encoded_df, encoded_features_df], axis=1)
encoded_df.drop(col, axis=1, inplace=True)
self.encoders[col] = encoder
return encoded_df
def target_encoding(self, df, columns, target_col, smoothing=1.0):
"""目標編碼"""
encoded_df = df.copy()
for col in columns:
if col in df.columns and target_col in df.columns:
encoder = TargetEncoder(smoothing=smoothing)
encoded_df[f'{col}_target_enc'] = encoder.fit_transform(
df[col], df[target_col]
)
self.encoders[f'{col}_target'] = encoder
return encoded_df
def frequency_encoding(self, df, columns):
"""頻率編碼"""
encoded_df = df.copy()
for col in columns:
if col in df.columns:
freq_encoding = df[col].value_counts(normalize=True)
encoded_df[f'{col}_freq_enc'] = df[col].map(freq_encoding)
return encoded_df
def label_encoding(self, df, columns):
"""標籤編碼"""
encoded_df = df.copy()
for col in columns:
if col in df.columns:
encoder = LabelEncoder()
encoded_df[f'{col}_label_enc'] = encoder.fit_transform(df[col])
self.encoders[f'{col}_label'] = encoder
return encoded_df
def compare_encoding_methods(self, df, feature_col, target_col):
"""比較不同編碼方法"""
results = {}
# 原始標籤編碼
le = LabelEncoder()
le_encoded = le.fit_transform(df[feature_col])
results['Label Encoding'] = le_encoded
# 目標編碼
te = TargetEncoder()
te_encoded = te.fit_transform(df[feature_col], df[target_col])
results['Target Encoding'] = te_encoded
# 頻率編碼
freq_map = df[feature_col].value_counts(normalize=True)
freq_encoded = df[feature_col].map(freq_map)
results['Frequency Encoding'] = freq_encoded
return results
# 類別編碼示例
categorical_encoder = CategoricalFeatureEncoder()
# 應用不同編碼方法
categorical_cols = ['education', 'city']
# One-Hot編碼
onehot_encoded = categorical_encoder.one_hot_encoding(sample_data, categorical_cols)
# 目標編碼
target_encoded = categorical_encoder.target_encoding(
sample_data, categorical_cols, 'target'
)
print("\n類別特徵編碼完成")
print(f"One-Hot編碼後特徵數: {onehot_encoded.shape[1]}")
print(f"目標編碼後特徵數: {target_encoded.shape[1]}")
時間特徵工程
時間特徵提取
from datetime import datetime, timedelta
class TimeFeatureEngineer:
"""時間特徵工程師"""
def create_time_features(self, df, time_column):
"""創建時間特徵"""
time_df = df.copy()
if time_column in df.columns:
# 確保時間列是datetime類型
time_df[time_column] = pd.to_datetime(time_df[time_column])
# 基礎時間特徵
time_df[f'{time_column}_year'] = time_df[time_column].dt.year
time_df[f'{time_column}_month'] = time_df[time_column].dt.month
time_df[f'{time_column}_day'] = time_df[time_column].dt.day
time_df[f'{time_column}_hour'] = time_df[time_column].dt.hour
time_df[f'{time_column}_dayofweek'] = time_df[time_column].dt.dayofweek
time_df[f'{time_column}_quarter'] = time_df[time_column].dt.quarter
time_df[f'{time_column}_is_weekend'] = time_df[time_column].dt.dayofweek.isin([5, 6]).astype(int)
# 週期編碼(處理週期性)
time_df[f'{time_column}_month_sin'] = np.sin(2 * np.pi * time_df[time_column].dt.month / 12)
time_df[f'{time_column}_month_cos'] = np.cos(2 * np.pi * time_df[time_column].dt.month / 12)
time_df[f'{time_column}_day_sin'] = np.sin(2 * np.pi * time_df[time_column].dt.day / 31)
time_df[f'{time_column}_day_cos'] = np.cos(2 * np.pi * time_df[time_column].dt.day / 31)
return time_df
def create_time_based_features(self, df, time_column, value_columns):
"""創建基於時間的聚合特徵"""
time_df = df.copy()
time_df[time_column] = pd.to_datetime(time_df[time_column])
time_df = time_df.sort_values(time_column)
# 滾動統計特徵
for value_col in value_columns:
if value_col in df.columns:
# 滾動均值
time_df[f'{value_col}_rolling_mean_7'] = time_df[value_col].rolling(
window=7, min_periods=1
).mean()
# 滾動標準差
time_df[f'{value_col}_rolling_std_7'] = time_df[value_col].rolling(
window=7, min_periods=1
).std()
# 滯後特徵
time_df[f'{value_col}_lag_1'] = time_df[value_col].shift(1)
time_df[f'{value_col}_lag_7'] = time_df[value_col].shift(7)
return time_df
def create_seasonal_features(self, df, time_column):
"""創建季節性特徵"""
time_df = df.copy()
time_df[time_column] = pd.to_datetime(time_df[time_column])
# 季節特徵
time_df['season'] = (time_df[time_column].dt.month % 12 + 3) // 3
# 節假日特徵(簡化版)
time_df['is_holiday'] = 0 # 實際應用中需要真實的節假日數據
# 業務週期特徵
time_df['is_month_start'] = time_df[time_column].dt.is_month_start.astype(int)
time_df['is_month_end'] = time_df[time_column].dt.is_month_end.astype(int)
time_df['is_quarter_start'] = time_df[time_column].dt.is_quarter_start.astype(int)
time_df['is_quarter_end'] = time_df[time_column].dt.is_quarter_end.astype(int)
time_df['is_year_start'] = time_df[time_column].dt.is_year_start.astype(int)
time_df['is_year_end'] = time_df[time_column].dt.is_year_end.astype(int)
return time_df
# 創建示例時間數據
def create_time_series_data():
"""創建時間序列示例數據"""
dates = pd.date_range('2023-01-01', '2024-01-01', freq='D')
n_samples = len(dates)
data = {
'date': dates,
'value': np.random.normal(100, 20, n_samples) +
10 * np.sin(2 * np.pi * np.arange(n_samples) / 30), # 月度週期
'category': np.random.choice(['A', 'B', 'C'], n_samples)
}
return pd.DataFrame(data)
# 時間特徵工程示例
time_engineer = TimeFeatureEngineer()
time_data = create_time_series_data()
# 創建時間特徵
time_features = time_engineer.create_time_features(time_data, 'date')
time_features = time_engineer.create_seasonal_features(time_features, 'date')
print("\n時間特徵工程完成")
print(f"時間特徵數量: {time_features.shape[1]}")
print("時間特徵示例:")
print(time_features[['date_year', 'date_month', 'season', 'is_weekend']].head())
文本特徵工程
文本特徵提取技術
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
import re
from collections import Counter
class TextFeatureEngineer:
"""文本特徵工程師"""
def __init__(self):
self.vectorizers = {}
self.text_stats = {}
def clean_text(self, text_series):
"""文本清洗"""
cleaned_text = text_series.str.lower()
cleaned_text = cleaned_text.str.replace(r'[^\w\s]', '', regex=True)
cleaned_text = cleaned_text.str.replace(r'\d+', '', regex=True)
cleaned_text = cleaned_text.str.strip()
return cleaned_text
def create_basic_text_features(self, text_series):
"""創建基礎文本特徵"""
features = {}
# 文本長度特徵
features['text_length'] = text_series.str.len()
features['word_count'] = text_series.str.split().str.len()
features['char_count'] = text_series.str.replace(' ', '').str.len()
features['avg_word_length'] = features['char_count'] / features['word_count']
# 特殊字符統計
features['digit_count'] = text_series.str.count(r'\d')
features['uppercase_count'] = text_series.str.count(r'[A-Z]')
features['special_char_count'] = text_series.str.count(r'[^\w\s]')
return pd.DataFrame(features)
def tfidf_vectorization(self, text_series, max_features=100):
"""TF-IDF向量化"""
tfidf = TfidfVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 2)
)
tfidf_matrix = tfidf.fit_transform(text_series)
feature_names = tfidf.get_feature_names_out()
tfidf_df = pd.DataFrame(
tfidf_matrix.toarray(),
columns=[f'tfidf_{name}' for name in feature_names],
index=text_series.index
)
self.vectorizers['tfidf'] = tfidf
return tfidf_df
def count_vectorization(self, text_series, max_features=50):
"""計數向量化"""
count_vec = CountVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 1)
)
count_matrix = count_vec.fit_transform(text_series)
feature_names = count_vec.get_feature_names_out()
count_df = pd.DataFrame(
count_matrix.toarray(),
columns=[f'count_{name}' for name in feature_names],
index=text_series.index
)
self.vectorizers['count'] = count_vec
return count_df
def create_advanced_text_features(self, text_series):
"""創建高級文本特徵"""
features = {}
# 詞彙豐富度
def lexical_richness(text):
words = text.split()
if len(words) == 0:
return 0
return len(set(words)) / len(words)
features['lexical_richness'] = text_series.apply(lexical_richness)
# 句子複雜度
def sentence_complexity(text):
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) == 0:
return 0
avg_words = sum(len(s.split()) for s in sentences) / len(sentences)
return avg_words
features['sentence_complexity'] = text_series.apply(sentence_complexity)
return pd.DataFrame(features)
# 創建示例文本數據
def create_text_data():
"""創建文本示例數據"""
texts = [
"This is a great product with amazing features",
"I love using this software for my daily work",
"The user interface is very intuitive and easy to use",
"This application has some performance issues",
"Excellent customer support and quick response",
"Needs improvement in documentation and tutorials"
]
return pd.DataFrame({'text': texts * 50}) # 擴展數據量
# 文本特徵工程示例
text_engineer = TextFeatureEngineer()
text_data = create_text_data()
# 文本清洗
cleaned_text = text_engineer.clean_text(text_data['text'])
# 創建基礎特徵
basic_features = text_engineer.create_basic_text_features(cleaned_text)
# TF-IDF特徵
tfidf_features = text_engineer.tfidf_vectorization(cleaned_text, max_features=20)
# 合併所有特徵
all_text_features = pd.concat([basic_features, tfidf_features], axis=1)
print("\n文本特徵工程完成")
print(f"文本特徵數量: {all_text_features.shape[1]}")
print("文本特徵示例:")
print(all_text_features.iloc[:, :5].head())
特徵選擇技術
多種特徵選擇方法
from sklearn.feature_selection import RFE, SelectFromModel
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import Lasso
from sklearn.feature_selection import mutual_info_classif, f_regression
class FeatureSelector:
"""特徵選擇器"""
def __init__(self):
self.selector_info = {}
def correlation_analysis(self, df, target_col, threshold=0.8):
"""相關性分析"""
# 計算特徵相關性
corr_matrix = df.corr()
# 找出高相關特徵對
high_corr_pairs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
if abs(corr_matrix.iloc[i, j]) > threshold:
high_corr_pairs.append((
corr_matrix.columns[i],
corr_matrix.columns[j],
corr_matrix.iloc[i, j]
))
# 目標相關性
target_corr = corr_matrix[target_col].abs().sort_values(ascending=False)
return {
'high_correlation_pairs': high_corr_pairs,
'target_correlation': target_corr
}
def univariate_selection(self, X, y, k=10, method='f_classif'):
"""單變量特徵選擇"""
if method == 'f_classif':
selector = SelectKBest(score_func=f_classif, k=k)
elif method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_classif, k=k)
else:
raise ValueError("不支持的篩選方法")
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
feature_scores = selector.scores_
self.selector_info['univariate'] = selector
return X_selected, selected_features, feature_scores
def recursive_elimination(self, X, y, n_features=10):
"""遞歸特徵消除"""
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
selector = RFE(estimator=estimator, n_features_to_select=n_features)
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
feature_ranking = selector.ranking_
self.selector_info['rfe'] = selector
return X_selected, selected_features, feature_ranking
def model_based_selection(self, X, y, threshold='mean'):
"""基於模型的特徵選擇"""
# 使用Lasso進行特徵選擇
lasso = Lasso(alpha=0.01, random_state=42)
selector = SelectFromModel(lasso, threshold=threshold)
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
feature_importance = selector.estimator_.coef_
self.selector_info['lasso'] = selector
return X_selected, selected_features, feature_importance
def compare_selection_methods(self, X, y, target_names):
"""比較不同特徵選擇方法"""
results = {}
# 單變量選擇
_, uni_features, uni_scores = self.univariate_selection(X, y, k=5)
results['Univariate'] = {
'features': uni_features,
'scores': dict(zip(uni_features, uni_scores[uni_scores > 0]))
}
# 遞歸消除
_, rfe_features, rfe_ranking = self.recursive_elimination(X, y, n_features=5)
results['RFE'] = {
'features': rfe_features,
'ranking': dict(zip(X.columns, rfe_ranking))
}
# 模型選擇
_, model_features, model_importance = self.model_based_selection(X, y)
results['Model-Based'] = {
'features': model_features,
'importance': dict(zip(X.columns, model_importance))
}
return results
# 特徵選擇示例
feature_selector = FeatureSelector()
# 準備數據(使用之前處理的特徵)
X = all_text_features.fillna(0)
y = np.random.choice([0, 1], len(X)) # 隨機目標變量
# 相關性分析
corr_results = feature_selector.correlation_analysis(
pd.concat([X, pd.Series(y, name='target')], axis=1),
'target'
)
# 比較不同選擇方法
selection_comparison = feature_selector.compare_selection_methods(X, y, ['feature'])
print("\n特徵選擇完成")
print("不同方法選擇的特徵:")
for method, result in selection_comparison.items():
print(f"{method}: {list(result['features'])[:5]}")
自動化特徵工程
自動化特徵生成
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
class AutomatedFeatureEngineering:
"""自動化特徵工程"""
def __init__(self):
self.preprocessor = None
self.feature_names = []
def create_preprocessing_pipeline(self, numeric_features, categorical_features):
"""創建預處理管道"""
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
self.preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
return self.preprocessor
def auto_generate_features(self, df, target_col=None):
"""自動生成特徵"""
generated_df = df.copy()
# 自動檢測特徵類型
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
if target_col and target_col in numeric_cols:
numeric_cols.remove(target_col)
# 數值特徵變換
numeric_processor = NumericFeatureProcessor()
if numeric_cols:
# 交互特徵
if len(numeric_cols) >= 2:
feature_pairs = [(numeric_cols[0], numeric_cols[1])]
generated_df = numeric_processor.create_interaction_features(
generated_df, feature_pairs
)
# 多項式特徵
if len(numeric_cols) >= 2:
generated_df = numeric_processor.create_polynomial_features(
generated_df, numeric_cols[:2], degree=2
)
# 類別特徵編碼
categorical_encoder = CategoricalFeatureEncoder()
if categorical_cols:
generated_df = categorical_encoder.frequency_encoding(
generated_df, categorical_cols
)
self.feature_names = generated_df.columns.tolist()
return generated_df
def evaluate_feature_importance(self, X, y, top_k=10):
"""評估特徵重要性"""
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
return feature_importance.head(top_k)
# 自動化特徵工程示例
auto_fe = AutomatedFeatureEngineering()
# 準備示例數據
sample_df = pipeline.load_sample_data()
# 自動生成特徵
auto_features = auto_fe.auto_generate_features(sample_df, 'target')
print("\n自動化特徵工程完成")
print(f"原始特徵數: {sample_df.shape[1]}")
print(f"生成特徵數: {auto_features.shape[1]}")
print(f"新增特徵示例: {[col for col in auto_features.columns if col not in sample_df.columns][:5]}")
特徵工程最佳實踐
性能優化檢查表
class FeatureEngineeringBestPractices:
"""特徵工程最佳實踐"""
def __init__(self):
self.checklist = {
'數據質量': [
'處理缺失值',
'處理異常值',
'驗證數據分佈',
'檢查數據一致性'
],
'特徵生成': [
'創建領域相關特徵',
'生成交互特徵',
'考慮時間序列特徵',
'處理類別特徵編碼'
],
'特徵選擇': [
'移除低方差特徵',
'處理多重共線性',
'基於重要性選擇特徵',
'驗證特徵穩定性'
],
'性能優化': [
'特徵標準化',
'處理數據傾斜',
'優化內存使用',
'建立特徵管道'
]
}
def validate_features(self, X, y):
"""特徵驗證"""
validation_results = {}
# 檢查缺失值
missing_ratio = X.isnull().sum() / len(X)
validation_results['missing_values'] = missing_ratio[missing_ratio > 0.05]
# 檢查低方差特徵
from sklearn.feature_selection import VarianceThreshold
selector = VarianceThreshold(threshold=0.01)
try:
selector.fit(X)
low_variance_mask = ~selector.get_support()
validation_results['low_variance_features'] = X.columns[low_variance_mask].tolist()
except:
validation_results['low_variance_features'] = []
# 檢查特徵與目標的相關性
if y is not None:
correlations = []
for col in X.select_dtypes(include=[np.number]).columns:
try:
corr = np.corrcoef(X[col], y)[0, 1]
correlations.append((col, abs(corr)))
except:
continue
validation_results['target_correlations'] = sorted(
correlations, key=lambda x: x[1], reverse=True
)[:10]
return validation_results
def create_feature_documentation(self, feature_df):
"""創建特徵文檔"""
documentation = {}
for col in feature_df.columns:
doc = {
'data_type': str(feature_df[col].dtype),
'missing_count': feature_df[col].isnull().sum(),
'unique_count': feature_df[col].nunique(),
'description': f'自動生成的特徵: {col}'
}
if feature_df[col].dtype in [np.int64, np.float64]:
doc.update({
'min_value': feature_df[col].min(),
'max_value': feature_df[col].max(),
'mean_value': feature_df[col].mean(),
'std_value': feature_df[col].std()
})
documentation[col] = doc
return documentation
# 最佳實踐應用
best_practices = FeatureEngineeringBestPractices()
# 特徵驗證
validation_results = best_practices.validate_features(auto_features, sample_data['target'])
# 創建特徵文檔
feature_docs = best_practices.create_feature_documentation(auto_features)
print("\n特徵工程最佳實踐檢查")
print("驗證結果:")
for check_type, results in validation_results.items():
if len(results) > 0:
print(f"{check_type}: {len(results)}個問題")
else:
print(f"{check_type}: 通過")
print(f"\n特徵文檔已創建,包含{len(feature_docs)}個特徵的詳細信息")
總結與展望
特徵工程技術對比
|
技術類別
|
主要方法
|
適用場景
|
優缺點
|
|
數值處理
|
標準化、分箱、多項式
|
連續數值數據
|
提升模型穩定性,可能丟失信息
|
|
類別編碼
|
One-Hot、目標編碼、頻率編碼
|
分類變量
|
處理類別關係,可能增加維度
|
|
時間特徵
|
週期編碼、滯後特徵、季節分解
|
時間序列數據
|
捕捉時間模式,需要時間對齊
|
|
文本特徵
|
TF-IDF、詞嵌入、N-gram
|
自然語言數據
|
提取語義信息,計算成本高
|
|
特徵選擇
|
相關性、模型重要性、遞歸消除
|
高維數據
|
減少過擬合,可能丟失重要特徵
|
2025年發展趨勢
- 自動化特徵工程:基於AutoML的智能特徵生成
- 深度學習特徵:神經網絡自動學習特徵表示
- 可解釋性特徵:確保特徵工程的透明度和可解釋性
- 實時特徵工程:流式數據處理和實時特徵更新
- 跨域特徵學習:遷移學習在特徵工程中的應用
實踐建議
- 理解業務背景:特徵工程必須結合領域知識
- 迭代優化:通過實驗不斷驗證和改進特徵
- 監控特徵穩定性:確保特徵在時間上的穩定性
- 文檔化過程:詳細記錄特徵生成邏輯和假設
- 考慮計算效率:在效果和效率之間找到平衡
通過系統化的特徵工程流程,結合領域知識和數據理解,能夠顯著提升機器學習模型的性能和魯棒性。記住,好的特徵工程往往比複雜的模型算法更能帶來實質性的性能提升。