背景

在机器学习的资料预处理阶段,若遇到极度不平衡资料 (如金融诈骗、医学疾病、特定事件...) 我们经常需要进行特徵缩放和不平衡资料处理,但我们应该先做哪一个? 这两个步骤的先后顺序一直是个充满争议的话题,我在许多论坛看到各种说法。有人认为应该先进行特徵缩,也有人主张先处理不平衡问题才能得到更好的结果,有些人认为都好。我希望透过这篇文章,借由一些实验,探讨这个问题。

首先,理解两个预处理步骤的本质目的

要做出正确的判断,我们必须先理解这两个预处理步骤各自的核心目标:

(1) 特徵缩放 (标準化 or MinMaxScaler) : 根本目的是将不同尺度的特徵转换到相同的比例尺度上。这不仅能确保模型在学习过程中公平地评估每个特徵的重要性,更能大幅提升计算效率。当特徵被缩放到较小的范围时,各种涉及距离计算的演算法(如SMOTE)能更快速地执行,这是因为小数值的运算本质上就比大数值更有效率。

(2) 不平衡处理 (imbalance sampling) : 目标则是确保模型能充分学习到少数类别的特徵模式。无论是透过过採样产生新的少数类别样本,还是通过欠採样减少多数类别的数量,这个步骤的重点都在于调整类别间的平衡,使模型能更好地识别稀少但重要的模式。

实验设计与分析

为了深入研究处理顺序的影响,我设计了一个实验,给定使用最单纯的logistic预测模型下,比较了三种不同的处理顺序:

  • 先缩放后採样(Scaling → Sampling)
  • 先採样后缩放,使用重採样后的统计量(Sampling → Scaling with Resampled Stats)
  • 先採样后缩放,使用原始资料的统计量(Sampling → Scaling with Original Stats)
  • 其中,比较了StandardScaler和MinMaxScaler两种缩放方法,以及SMOTE、随机欠採样、SMOTE-Tomek三种不平衡处理方法。

    使用资料

    Kaggle 的 Credit Card Fraud Detection 资料集。 资料集连结这是一个欧洲信用卡诈骗侦测资料集,收录了2013年9月两天内的284,807笔交易纪录,其中诈骗案例佔0.172%(492笔)。主要特徵包含28个经PCA转换的匿名特徵(V1-V28),以及原始的交易时间和金额。由于资料极度不平衡,该竞赛建议使用ROCAUC而非準确率来评估模型表现。

    实验结果比较

    1. 採样执行效率

    (各个情境都执行10次,计算平均执行时间和标準差)

    (颜色标住的地方代表该情境下,所花费时间最少)

    实验结果显示,先进行特徵缩放后再处理不平衡问题能够显降低不平衡抽样执行效率 (sampling_time)。这个结果完全符合理论预期:当特徵被缩放到较小的范围后,不平衡处理算法 (特别是基于距离计算的方法如SMOTE)的运算效率自然会提高。

    以SMOTE+标準化缩放为例:当先进行缩放时 (Scaling-->Sampling):总处理时间为 0.1328 秒当先进行不平衡抽样时(Sampling --> Scaling (Resampled Stats)):总处理时间为 0.3754 秒时间减少了约65% (0.1328/0.3754-1)x100%

    这种效率提升的原因在于经过缩放的特徵能够让SMOTE算法中的距离计算更有效率,降低了生成合成样本的时间成本。这种改善在不同的缩放方法,如标準化、MinMax中都能观察到。

    此外,我的训练样本才40多万笔,假设套用到实务,动辄上百万笔的交易数据,不就差异更大了!!?

    2. 统计量特徵

    标準化处理:可以发现若先採样,会把原始资料的平均、标準差大幅改变

    MinMax处理,这边除了Undersampling涉及删减样本,而导致最大最小改变外,其他情境的MinMax都差不多一样

    当我们先进行不平衡处理时,后续的缩放过程会使用改变后的数据分布来计算统计量,这可能导致特徵的相对关系发生变化。相比之下,先进行缩放能确保我们使用原始数据的统计特性,这对于保持特徵间的真实关系至关重要。但如果是透过MinMaxScaler的方式缩放特徵,除非不平衡处里演算法有做到删减样本的动作 (如本次实证採用欠採样,将正常样本删除到与诈欺样本 =2:1),原则上MinMaxScaler的最大、最小若不会变动,则执行先后无差,反之就要注意。

    3. 模型效度

    由于资料极度不平衡,该竞赛建议使用ROCAUC而非準确率来评估模型表现。实验结果显示,在模型效能方面,处理顺序的影响虽然存在,但数据差异并不显着。然而,值得注意的是,在SMOTE和SMOTE-Tomek这两种方法下,先进行特徵缩放的策略在ROC-AUC指标上仍略微领先。

    额外讨论:特别说明Undersampling和SMOTE系列方法的选择考量:Undersampling特性:实验中採用2:1的比例 (删减后正常样本:诈欺样本)。优点是能有效降低误报率 (FP),进而使ROC-AUC表现较好SMOTE系列特性:通过模拟方式扩增异常样本,优点是能更好地检测出异常案例 (FN小于等于Undersampling的FN),但容易过拟。虽然可能导致较高的误报率 (FP),但在实务应用中更适合处理高成本异常案例实务应用建议:当异常样本的成本特别高时 (如金融诈欺造成的巨额损失),建议使用SMOTE系列方法。这类情况下,宁可提高异常检测比率,也不要漏掉真正的异常案例,须根据具体业务场景和成本考量来选择合适的方法。

    个人建议

    基于实验结果和理论分析,我强烈建议在大多数情况下先进行特徵缩放,再处理不平衡问题。这个建议基于以下三个关键理由:

  • 计算效率:特徵缩放后的小数值运算能显着提升后续处理的效率,这是数值计算的基本原理决定的。
  • 统计特性:先进行缩放能够更好地保持原始数据的统计特性,确保特徵间的相对关系不会因为不平衡处里而失真。
  • 实作稳定性:这个处理顺序能提供更可预测和稳定的结果,特别是在使用需要距离计算的採样方法时。
  • 结论

    通过本次的实验,我们有诸多理由得到 「在大多数情况下,先进行特徵缩放再处理不平衡问题是更优的选择。」这个结论不仅基于效率考量,更重要的是基于对数据本质和演算法机制的考量。当然,在特定场景下 (如某些手法下,若使用MinMaxScaler且确定不会影响特徵边界值的情况)处理顺序的影响可能较小,但为了工作流程的一致性和可靠性,建议仍然採用「先缩放后平衡」的标準流程。

    附录-程式码

    import numpy as np
    import pandas as pd
    import time
    import statistics
    from tqdm import tqdm
    from dataclasses import dataclass
    from typing import Dict, List, Tuple
    from sklearn.preprocessing import StandardScaler, MinMaxScaler
    from imblearn.over_sampling import SMOTE
    from imblearn.under_sampling import RandomUnderSampler
    from imblearn.combine import SMOTETomek
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import (
    roc_auc_score,
    confusion_matrix,
    accuracy_score,
    recall_score,
    precision_score,
    f1_score
    )
    from sklearn.linear_model import LogisticRegression

    @dataclass
    class TimingMetrics:
    scaling_time: float = 0.0
    scaling_std: float = 0.0
    sampling_time: float = 0.0
    sampling_std: float = 0.0
    training_time: float = 0.0
    training_std: float = 0.0
    total_time: float = 0.0

    def measure_execution_time(func, num_runs=5):
    """测量函数执行时间,返回平均值和标準差"""
    execution_times = []

    # 预热运行
    for _ in range(3):
    func()

    # 正式测量
    for _ in range(num_runs):
    start_time = time.perf_counter()
    func()
    end_time = time.perf_counter()
    execution_times.append(end_time - start_time)

    mean_time = statistics.mean(execution_times)
    std_dev = statistics.stdev(execution_times) if len(execution_times) > 1 else 0

    return mean_time, std_dev

    def reduce_mem_usage(df):
    """减少DataFrame的记忆体使用"""
    for col in df.columns:
    col_type = df[col].dtype
    if col_type != object:
    c_min = df[col].min()
    c_max = df[col].max()

    if str(col_type)[:3] == \'int\':
    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
    df[col] = df[col].astype(np.int8)
    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
    df[col] = df[col].astype(np.int16)
    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
    df[col] = df[col].astype(np.int32)
    else:
    if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
    df[col] = df[col].astype(np.float32)
    else:
    df[col] = df[col].astype(np.float64)
    return df

    def process_experiment(X_train: np.ndarray, X_test: np.ndarray,
    y_train: np.ndarray, y_test: np.ndarray,
    scaler_class, sampler, process_type: str,
    feature_names: List[str],
    random_state: int = 123,
    num_runs: int = 5) -> Tuple[Dict, List[Dict], TimingMetrics]:
    """执行单次实验流程并记录详细时间"""
    timing = TimingMetrics()
    start_total = time.perf_counter()

    if process_type == "Scaling → Sampling":
    # 建立并保存 scaler
    scaler = scaler_class()

    # 测量缩放时间
    def scaling_step():
    scaler.fit(X_train)
    return scaler.transform(X_train)

    scaling_time, scaling_std = measure_execution_time(scaling_step, num_runs)
    X_train_scaled = scaling_step() # 实际使用的转换结果
    timing.scaling_time = scaling_time
    timing.scaling_std = scaling_std

    # 测量取样时间
    def sampling_step():
    return sampler.fit_resample(X_train_scaled, y_train)

    sampling_time, sampling_std = measure_execution_time(sampling_step, num_runs)
    X_current, y_resampled = sampling_step() # 实际使用的取样结果
    timing.sampling_time = sampling_time
    timing.sampling_std = sampling_std

    elif process_type == "Sampling → Scaling (Resampled Stats)":
    # 测量取样时间
    def sampling_step():
    return sampler.fit_resample(X_train, y_train)

    sampling_time, sampling_std = measure_execution_time(sampling_step, num_runs)
    X_resampled, y_resampled = sampling_step()
    timing.sampling_time = sampling_time
    timing.sampling_std = sampling_std

    # 测量缩放时间
    # 建立并保存 scaler
    scaler = scaler_class()

    def scaling_step():
    scaler.fit(X_resampled)
    return scaler.transform(X_resampled)

    scaling_time, scaling_std = measure_execution_time(scaling_step, num_runs)
    X_current = scaling_step()
    timing.scaling_time = scaling_time
    timing.scaling_std = scaling_std

    else: # "Sampling → Scaling (Original Stats)"
    # 测量缩放时间
    def scaling_step():
    scaler = scaler_class()
    scaler.fit(X_train)
    return scaler.transform(X_train)

    scaling_time, scaling_std = measure_execution_time(scaling_step, num_runs)
    scaler = scaler_class().fit(X_train) # 保存转换器供后续使用
    timing.scaling_time = scaling_time
    timing.scaling_std = scaling_std

    # 测量取样时间
    def sampling_step():
    return sampler.fit_resample(X_train, y_train)

    sampling_time, sampling_std = measure_execution_time(sampling_step, num_runs)
    X_resampled, y_resampled = sampling_step()
    timing.sampling_time = sampling_time
    timing.sampling_std = sampling_std

    X_current = scaler.transform(X_resampled)

    # 收集缩放统计资料
    scaling_stats = []
    for i, feature in enumerate(feature_names):
    stats = {
    \'feature\': feature,
    \'mean_used\': scaler.data_min_[i] if isinstance(scaler, MinMaxScaler) else scaler.mean_[i],
    \'std_used\': scaler.data_max_[i] if isinstance(scaler, MinMaxScaler) else scaler.scale_[i],
    \'mean_result\': X_current[:, i].mean(),
    \'std_result\': X_current[:, i].std()
    }
    scaling_stats.append(stats)

    # 测量训练时间
    def training_step():
    model = LogisticRegression(random_state=random_state, solver=\'liblinear\', max_iter=1000)
    model.fit(X_current, y_resampled)
    return model

    training_time, training_std = measure_execution_time(training_step, num_runs)
    model = training_step() # 实际使用的模型
    timing.training_time = training_time
    timing.training_std = training_std

    # 预测
    X_test_scaled = scaler.transform(X_test)
    y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
    y_pred_label = (y_pred_proba >= 0.5).astype(int)

    timing.total_time = time.perf_counter() - start_total

    # 计算评估指标
    metrics = {
    \'training_samples\': len(y_resampled),
    \'roc_auc\': roc_auc_score(y_test, y_pred_proba),
    \'confusion_matrix\': confusion_matrix(y_test, y_pred_label).ravel(),
    \'accuracy\': accuracy_score(y_test, y_pred_label),
    \'recall\': recall_score(y_test, y_pred_label),
    \'precision\': precision_score(y_test, y_pred_label),
    \'f1\': f1_score(y_test, y_pred_label)
    }

    return metrics, scaling_stats, timing

    def create_summary_tables(results_df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """建立详细的统计摘要表格"""
    summary_tables = {}

    # 1. 按照缩放方法分组的统计
    metrics_cols = [\'roc_auc\', \'accuracy\', \'precision\', \'recall\', \'f1\',
    \'scaling_time\', \'scaling_std\', \'sampling_time\', \'sampling_std\',
    \'training_time\', \'training_std\', \'total_time\']

    for group_by in [\'scaling_method\', \'sampling_method\', \'preprocessing_order\']:
    summary = results_df.groupby(group_by)[metrics_cols].agg([
    (\'mean\', \'mean\'),
    (\'std\', \'std\'),
    (\'min\', \'min\'),
    (\'max\', \'max\')
    ]).round(4)

    summary_tables[f\'summary_by_{group_by}\'] = summary

    # 2. 交叉分析表
    for metric in metrics_cols:
    pivot = pd.pivot_table(
    results_df,
    values=metric,
    index=\'scaling_method\',
    columns=[\'sampling_method\', \'preprocessing_order\'],
    aggfunc=\'mean\'
    ).round(4)

    summary_tables[f\'pivot_{metric}\'] = pivot

    return summary_tables

    def main():
    # 设定RANDOM_STATE
    RANDOM_STATE = 123
    NUM_RUNS = 10 # 每个时间测量重复次数

    # 读取资料
    print("Loading and preparing data...")
    data = pd.read_csv("creditcard.csv").dropna()
    #data = data.sample(n=6000, random_state=RANDOM_STATE)
    data = reduce_mem_usage(data)
    print("Dataset Shape:", data.shape)

    # 显示类别分布
    print("\\nClass distribution:")
    counts = data[\'Class\'].value_counts()
    print(f"Normal transactions: {counts[0]}")
    print(f"Fraudulent transactions: {counts[1]}")
    print(f"Fraud ratio: {(counts[1] / len(data) * 100):.4f}%")

    # 準备资料
    X = data.drop(\'Class\', axis=1)
    y = data[\'Class\']
    feature_names = X.columns.tolist()

    # 分割训练测试集
    X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE
    )

    # 不平衡处理方法
    samplers = {
    \'SMOTE\': SMOTE(random_state=RANDOM_STATE),
    \'Undersampling\': RandomUnderSampler(random_state=RANDOM_STATE, sampling_strategy=0.5),
    \'SMOTE-Tomek\': SMOTETomek(random_state=RANDOM_STATE)
    }

    # 缩放方法
    scaler_choices = {
    \'Standard\': StandardScaler,
    \'MinMax\': lambda: MinMaxScaler(feature_range=(-1, 1))
    }

    # 处理顺序
    process_types = [
    "Sampling → Scaling (Resampled Stats)",
    "Sampling → Scaling (Original Stats)",
    "Scaling → Sampling",
    ]

    # 储存结果
    results = []
    all_scaling_stats = []

    print(f"\\nRunning experiments (each timing measurement repeated {NUM_RUNS} times)...")

    # 执行实验
    for scaling_method_name, scaler_class in tqdm(scaler_choices.items(), desc="Scaling Methods"):
    for sampler_name, sampler in tqdm(samplers.items(),
    desc=f"Sampler for {scaling_method_name}",
    leave=False):
    for process_type in process_types:
    # 执行实验并收集结果
    metrics, scaling_stats, timing = process_experiment(
    X_train.copy(), X_test.copy(), y_train.copy(), y_test.copy(),
    scaler_class, sampler, process_type, feature_names,
    RANDOM_STATE, NUM_RUNS
    )

    # 储存结果
    result = {
    \'scaling_method\': scaling_method_name,
    \'preprocessing_order\': process_type,
    \'sampling_method\': sampler_name,
    \'scaling_time\': timing.scaling_time,
    \'scaling_std\': timing.scaling_std,
    \'sampling_time\': timing.sampling_time,
    \'sampling_std\': timing.sampling_std,
    \'training_time\': timing.training_time,
    \'training_std\': timing.training_std,
    \'total_time\': timing.total_time,
    \'roc_auc\': metrics[\'roc_auc\'],
    \'training_samples\': metrics[\'training_samples\'],
    \'TN\': metrics[\'confusion_matrix\'][0],
    \'FP\': metrics[\'confusion_matrix\'][1],
    \'FN\': metrics[\'confusion_matrix\'][2],
    \'TP\': metrics[\'confusion_matrix\'][3],
    \'accuracy\': metrics[\'accuracy\'],
    \'recall\': metrics[\'recall\'],
    \'precision\': metrics[\'precision\'],
    \'f1\': metrics[\'f1\']
    }
    results.append(result)

    # 储存缩放统计
    for stats in scaling_stats:
    stats.update({
    \'scaling_method\': scaling_method_name,
    \'preprocessing_order\': process_type,
    \'sampling_method\': sampler_name
    })
    all_scaling_stats.append(stats)

    # 转换为DataFrame
    results_df = pd.DataFrame(results)
    scaling_stats_df = pd.DataFrame(all_scaling_stats)

    # 生成摘要表格
    summary_tables = create_summary_tables(results_df)

    # 储存结果
    results_df.to_csv(\'detailed_results.csv\', index=False)
    scaling_stats_df.to_csv(\'scaling_statistics.csv\', index=False)

    # 储存摘要表格
    with pd.ExcelWriter(\'summary_statistics.xlsx\') as writer:
    for name, df in summary_tables.items():
    df.to_excel(writer, sheet_name=name[:31]) # Excel sheet名称最长31字元

    print("\\nExperiment completed. Results have been saved:")
    print("- Detailed results: detailed_results.csv")
    print("- Scaling statistics: scaling_statistics.csv")
    print("- Summary statistics: summary_statistics.xlsx")

    # 印出关键结果摘要
    print("\\nKey Performance Metrics Summary:")
    print("\\nROC-AUC Scores by Scaling Method:")
    print(summary_tables[\'summary_by_scaling_method\'][\'roc_auc\'])
    print("\\nROC-AUC Scores by Sampling Method:")
    print(summary_tables[\'summary_by_sampling_method\'][\'roc_auc\'])
    print("\\nROC-AUC Scores by Processing Order:")
    print(summary_tables[\'summary_by_preprocessing_order\'][\'roc_auc\'])

    print("\\nTiming Statistics Summary:")
    print("\\nScaling Time (seconds) by Method:")
    print(summary_tables[\'summary_by_scaling_method\'][[\'scaling_time\', \'scaling_std\']])
    print("\\nSampling Time (seconds) by Method:")
    print(summary_tables[\'summary_by_sampling_method\'][[\'sampling_time\', \'sampling_std\']])

    if __name__ == "__main__":
    main()