有这么一句话在业界广泛流传:数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限而已。由此可见,特征工程在机器学习中占有相当重要的地位。在实际应用当中,可以说特征工程是机器学习成功的关键。
特征工程是数据分析中最耗时间和精力的一部分工作,它不像算法和模型那样是确定的步骤,更多是工程上的经验和权衡。因此没有统一的方法。这里只是对一些常用的方法做一个总结。
特征工程包含了 Data PreProcessing(数据预处理)、Feature Extraction(特征提取)、Feature Selection(特征选择)和 Feature construction(特征构造)等子问题。
数据预处理
数据预处理是特征工程的最重要的起始步骤,需要把特征预处理成机器学习模型所能接受的形式。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml import Estimator, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.sql import Observation
from pyspark.sql import Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from xgboost.spark import SparkXGBClassifier
import xgboost as xgb
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import warnings
# Setting configuration.
warnings.filterwarnings('ignore')
SEED = 42
# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4
spark = SparkSession.builder \
.master("local[*]") \
.appName("XGBoost with PySpark") \
.config("spark.driver.memory", "10g") \
.config("spark.driver.cores", "2") \
.config("spark.executor.memory", "10g") \
.config("spark.executor.cores", "2") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
24/06/01 11:20:13 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/01 11:20:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/01 11:20:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
探索性数据分析
本项目使用 Kaggle 上的 家庭信用违约风险数据集 (Home Credit Default Risk) ,是一个标准的机器学习分类问题。其目标是使用历史贷款的信息,以及客户的社会经济和财务信息,预测客户是否会违约。
本篇主要通过 application 文件,做基本的数据分析和建模,也是本篇的主要内容。
df = spark.sql("select * from home_credit_default_risk.application_train")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
df.limit(5).toPandas()
SK_ID_CURR | TARGET | NAME_CONTRACT_TYPE | CODE_GENDER | FLAG_OWN_CAR | FLAG_OWN_REALTY | CNT_CHILDREN | AMT_INCOME_TOTAL | AMT_CREDIT | AMT_ANNUITY | ... | FLAG_DOCUMENT_18 | FLAG_DOCUMENT_19 | FLAG_DOCUMENT_20 | FLAG_DOCUMENT_21 | AMT_REQ_CREDIT_BUREAU_HOUR | AMT_REQ_CREDIT_BUREAU_DAY | AMT_REQ_CREDIT_BUREAU_WEEK | AMT_REQ_CREDIT_BUREAU_MON | AMT_REQ_CREDIT_BUREAU_QRT | AMT_REQ_CREDIT_BUREAU_YEAR | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 191480 | 0 | Cash loans | M | Y | N | 0 | 157500.0 | 342000.0 | 17590.5 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 7.0 |
1 | 191502 | 0 | Cash loans | F | N | Y | 0 | 108000.0 | 324000.0 | 20704.5 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
2 | 191673 | 0 | Cash loans | F | Y | Y | 0 | 135000.0 | 1323000.0 | 36513.0 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 2.0 |
3 | 191877 | 0 | Cash loans | F | N | Y | 2 | 45000.0 | 47970.0 | 5296.5 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 4.0 |
4 | 192108 | 0 | Cash loans | F | N | Y | 0 | 315000.0 | 263686.5 | 13522.5 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 5.0 | 2.0 | 3.0 |
5 rows × 122 columns
print(f"dataset shape: ({df.count()}, {len(df.columns)})")
dataset shape: (307511, 122)
# df.printSchema()
在遇到非常多的数据的时候,我们一般先会按照数据的类型分布下手,看看不同的数据类型各有多少
# Number of each type of column
dtypes = dict(df.dtypes)
pd.Series(dtypes).value_counts()
double 65
int 41
string 16
Name: count, dtype: int64
接下来看下数据集的统计信息
df.summary().toPandas()
summary | SK_ID_CURR | TARGET | NAME_CONTRACT_TYPE | CODE_GENDER | FLAG_OWN_CAR | FLAG_OWN_REALTY | CNT_CHILDREN | AMT_INCOME_TOTAL | AMT_CREDIT | ... | FLAG_DOCUMENT_18 | FLAG_DOCUMENT_19 | FLAG_DOCUMENT_20 | FLAG_DOCUMENT_21 | AMT_REQ_CREDIT_BUREAU_HOUR | AMT_REQ_CREDIT_BUREAU_DAY | AMT_REQ_CREDIT_BUREAU_WEEK | AMT_REQ_CREDIT_BUREAU_MON | AMT_REQ_CREDIT_BUREAU_QRT | AMT_REQ_CREDIT_BUREAU_YEAR | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | count | 307511 | 307511 | 307511 | 307511 | 307511 | 307511 | 307511 | 307511 | 307511 | ... | 307511 | 307511 | 307511 | 307511 | 265992 | 265992 | 265992 | 265992 | 265992 | 265992 |
1 | mean | 278180.51857657125 | 0.08072881945686496 | None | None | None | None | 0.4170517477423572 | 168797.91929698447 | 599025.9997057016 | ... | 0.008129790479039774 | 5.951006630657115E-4 | 5.072989258920819E-4 | 3.349473677364387E-4 | 0.006402448193930645 | 0.0070002105326475985 | 0.0343619356973142 | 0.26739526000781977 | 0.26547414959848414 | 1.899974435321363 |
2 | stddev | 102790.17534842461 | 0.2724186456483938 | None | None | None | None | 0.722121384437625 | 237123.14627885612 | 402490.776995855 | ... | 0.0897982361093956 | 0.024387465065862264 | 0.022517620268446132 | 0.01829853182243764 | 0.08384912844747726 | 0.11075740632435459 | 0.20468487581282443 | 0.9160023961526171 | 0.7940556483207575 | 1.8692949981815559 |
3 | min | 100002 | 0 | Cash loans | F | N | N | 0 | 25650.0 | 45000.0 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
4 | 25% | 189124 | 0 | None | None | None | None | 0 | 112500.0 | 270000.0 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
5 | 50% | 278173 | 0 | None | None | None | None | 0 | 146250.0 | 513531.0 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 |
6 | 75% | 367118 | 0 | None | None | None | None | 1 | 202500.0 | 808650.0 | ... | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 3.0 |
7 | max | 456255 | 1 | Revolving loans | XNA | Y | Y | 19 | 1.17E8 | 4050000.0 | ... | 1 | 1 | 1 | 1 | 4.0 | 9.0 | 8.0 | 27.0 | 261.0 | 25.0 |
8 rows × 123 columns
查看目标变量分布
# `TARGET` is the target variable we are trying to predict (0 or 1):
# 1 = Not Repaid
# 0 = Repaid
# Check if the data is unbalanced
row = df.select(fn.mean('TARGET').alias('rate')).first()
print(f"percentage of default : {row['rate']:.2%}")
df.groupBy("TARGET").count().show()
percentage of default : 8.07%
+------+------+
|TARGET| count|
+------+------+
| 1| 24825|
| 0|282686|
+------+------+
数据清洗
数据清洗 (Data cleaning) 是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。
数据去重
首先,根据某个 / 多个特征值构成的样本 ID 去重
# `SK_ID_CURR` is the unique id of the row.
df.dropDuplicates(subset=["SK_ID_CURR"]).count() == df.count()
True
数据类型转换
dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes
categorical_cols = [k for k, v in dtypes if v == 'string']
numerical_cols = [k for k, v in dtypes if v != 'string']
有时,有些数值型特征标识的只是不同类别,其数值的大小并没有实际意义,因此我们将其转化为类别特征。
本项目并无此类特征,以 hours_appr_process_start 为示例:
# df = df.withColumn('HOUR_APPR_PROCESS_START', df['HOUR_APPR_PROCESS_START'].astype(str))
错误数据清洗
接下来,我们根据业务常识,或者使用但不限于箱型图(Box-plot)发现数据中不合理的特征值进行清洗。
数据探索时,我们注意到,DAYS_BIRTH列(年龄)中的数字是负数,由于它们是相对于当前贷款申请计算的,所以我们将其转化成正数后查看分布
df.select(df['DAYS_BIRTH'] / -365).summary().show()
+-------+-------------------+
|summary|(DAYS_BIRTH / -365)|
+-------+-------------------+
| count| 307511|
| mean| 43.93697278587162|
| stddev| 11.956133237768654|
| min| 20.517808219178082|
| 25%| 34.00547945205479|
| 50%| 43.14794520547945|
| 75%| 53.917808219178085|
| max| 69.12054794520547|
+-------+-------------------+
那些年龄看起来合理,没有异常值。
接下来,我们对其他的 DAYS 特征作同样的分析
for feature in ['DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_REGISTRATION', 'DAYS_ID_PUBLISH']:
print(f'{feature} info: ')
df.select(df[feature] / -365).summary().show()
DAYS_BIRTH info:
+-------+-------------------+
|summary|(DAYS_BIRTH / -365)|
+-------+-------------------+
| count| 307511|
| mean| 43.93697278587162|
| stddev| 11.956133237768654|
| min| 20.517808219178082|
| 25%| 34.00547945205479|
| 50%| 43.14794520547945|
| 75%| 53.917808219178085|
| max| 69.12054794520547|
+-------+-------------------+
DAYS_EMPLOYED info:
+-------+----------------------+
|summary|(DAYS_EMPLOYED / -365)|
+-------+----------------------+
| count| 307511|
| mean| -174.83574220287002|
| stddev| 387.05689457185537|
| min| -1000.6657534246575|
| 25%| 0.7917808219178082|
| 50%| 3.3232876712328765|
| 75%| 7.558904109589041|
| max| 49.07397260273972|
+-------+----------------------+
DAYS_REGISTRATION info:
+-------+--------------------------+
|summary|(DAYS_REGISTRATION / -365)|
+-------+--------------------------+
| count| 307511|
| mean| 13.660603637091562|
| stddev| 9.651743345104306|
| min| -0.0|
| 25%| 5.504109589041096|
| 50%| 12.336986301369864|
| 75%| 20.487671232876714|
| max| 67.59452054794521|
+-------+--------------------------+
DAYS_ID_PUBLISH info:
+-------+------------------------+
|summary|(DAYS_ID_PUBLISH / -365)|
+-------+------------------------+
| count| 307511|
| mean| 8.20329417328335|
| stddev| 4.135480600008283|
| min| -0.0|
| 25%| 4.7095890410958905|
| 50%| 8.915068493150685|
| 75%| 11.775342465753425|
| max| 19.71780821917808|
+-------+------------------------+
buckets = df.select((df['DAYS_EMPLOYED'] / -365).alias('DAYS_EMPLOYED'))
bucketizer = ft.QuantileDiscretizer(numBuckets=10, inputCol='DAYS_EMPLOYED', outputCol='buckets').fit(buckets)
buckets = bucketizer.transform(buckets)
buckets.groupBy('buckets').count().sort('buckets').show()
bucketizer.getSplits()
+-------+-----+
|buckets|count|
+-------+-----+
| 1.0|61425|
| 2.0|30699|
| 3.0|30733|
| 4.0|30685|
| 5.0|30741|
| 6.0|30716|
| 7.0|30750|
| 8.0|30726|
| 9.0|31036|
+-------+-----+
[-inf,
-1000.6657534246575,
0.39452054794520547,
1.252054794520548,
2.2465753424657535,
3.317808219178082,
4.635616438356164,
6.457534246575342,
8.827397260273973,
13.2986301369863,
inf]
有超过60000个用户的DAYS_EMPLOYED在1000年上,可以猜测这只是缺失值标记。
# Replace the anomalous values with nan
df_emp = df.select(fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED']).alias('DAYS_EMPLOYED'))
df_emp.sample(0.1).toPandas().plot.hist(title = 'Days Employment Histogram')
plt.xlabel('Days Employment')
可以看到,数据分布基本正常了。
布尔特征清洗
for col in categorical_cols:
unique_count = df.select(col).dropna().distinct().count()
if unique_count == 2:
df.groupBy(col).count().show()
+------------------+------+
|NAME_CONTRACT_TYPE| count|
+------------------+------+
| Revolving loans| 29279|
| Cash loans|278232|
+------------------+------+
+------------+------+
|FLAG_OWN_CAR| count|
+------------+------+
| Y|104587|
| N|202924|
+------------+------+
+---------------+------+
|FLAG_OWN_REALTY| count|
+---------------+------+
| Y|213312|
| N| 94199|
+---------------+------+
+-------------------+------+
|EMERGENCYSTATE_MODE| count|
+-------------------+------+
| NULL|145755|
| No|159428|
| Yes| 2328|
+-------------------+------+
cols_to_transform = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'EMERGENCYSTATE_MODE']
df.replace(
['Y', 'N', 'Yes', 'No'], ['1', '0', '1', '0'],
subset=cols_to_transform
).select(cols_to_transform).show(5)
+------------+---------------+-------------------+
|FLAG_OWN_CAR|FLAG_OWN_REALTY|EMERGENCYSTATE_MODE|
+------------+---------------+-------------------+
| 1| 0| 0|
| 0| 1| 0|
| 1| 1| NULL|
| 0| 1| NULL|
| 0| 1| 0|
+------------+---------------+-------------------+
only showing top 5 rows
函数封装
最后,使用函数封装以上步骤:
dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes
categorical_cols = [k for k, v in dtypes if v == 'string']
numerical_cols = [k for k, v in dtypes if v != 'string']
# Data cleaning
def clean(df):
# remove duplicates.
df = df.dropDuplicates(subset=["SK_ID_CURR"])
# transform
cols_to_transform = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'EMERGENCYSTATE_MODE']
df = df.replace(
['Y', 'N', 'Yes', 'No'], ['1', '0', '1', '0'],
subset=cols_to_transform
)
df = df.withColumns({c: df[c].cast('int') for c in cols_to_transform})
# Replace the anomalous values with nan
df = df.withColumn('DAYS_EMPLOYED',
fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED'])
)
df = df.replace('XNA', None)
df = df.withColumnRenamed("TARGET", "label")
return df
df = clean(df)
特征重编码
有很多机器学习算法只能接受数值型特征的输入,不能处理离散值特征,比如线性回归,逻辑回归等线性模型,那么我们需要将离散特征重编码成数值变量。
现在我们来看看每个分类特征的类别数:
df.select([fn.countDistinct(col).alias(col) for col in categorical_cols]).show(1, vertical=True)
-RECORD 0-------------------------
NAME_CONTRACT_TYPE | 2
CODE_GENDER | 2
FLAG_OWN_CAR | 2
FLAG_OWN_REALTY | 2
NAME_TYPE_SUITE | 7
NAME_INCOME_TYPE | 8
NAME_EDUCATION_TYPE | 5
NAME_FAMILY_STATUS | 6
NAME_HOUSING_TYPE | 6
OCCUPATION_TYPE | 18
WEEKDAY_APPR_PROCESS_START | 7
ORGANIZATION_TYPE | 57
FONDKAPREMONT_MODE | 4
HOUSETYPE_MODE | 3
WALLSMATERIAL_MODE | 7
EMERGENCYSTATE_MODE | 2
- 变量 NAME_EDUCATION_TYPE 表征着潜在的排序关系,可以使用顺序编码。
- 变量 OCCUPATION_TYPE (职业类型)和 ORGANIZATION_TYPE 类别数较多,准备使用平均数编码。
- 剩余的无序分类特征使用one-hot编码。
顺序编码
有序分类特征实际上表征着潜在的排序关系,我们将这些特征的类别映射成有大小的数字,因此可以用顺序编码。
让我们从分类特征中手动提取有序级别:
# The ordinal (ordered) categorical features
# Pandas calls the categories "levels"
ordered_levels = {
"NAME_EDUCATION_TYPE": ["Lower secondary",
"Secondary / secondary special",
"Incomplete higher",
"Higher education"]
}
spark中的StringIndexer是按特征值出现的频率编码,我们需要自定义一个编码函数。
def ordinal_encode(df, levels):
for var, to_replace in levels.items():
mapping = {v: str(i) for i,v in enumerate(to_replace)}
df = df.replace(mapping, subset=[var])
df = df.withColumn(var, df[var].cast('int'))
print(f'{len(levels):d} columns were ordinal encoded')
return df
ordinal_encode(df, ordered_levels).groupBy(*ordered_levels.keys()).count().show()
1 columns were ordinal encoded
+-------------------+------+
|NAME_EDUCATION_TYPE| count|
+-------------------+------+
| NULL| 164|
| 1|218391|
| 3| 74863|
| 2| 10277|
| 0| 3816|
+-------------------+------+
平均数编码
一般情况下,针对分类特征,我们只需要OneHotEncoder或OrdinalEncoder进行编码,这类简单的预处理能够满足大多数数据挖掘算法的需求。如果某一个分类特征的可能值非常多(高基数 high cardinality),那么再使用one-hot编码往往会出现维度爆炸。平均数编码(mean encoding)是一种高效的编码方式,在实际应用中,能极大提升模型的性能。
变量 OCCUPATION_TYPE (职业类型)和 ORGANIZATION_TYPE类别数较多,准备使用平均数编码。
class MeanEncoder(Estimator, Transformer):
def __init__(self, smoothing=0.0, inputCols=None, labelCol="label"):
"""
The MeanEncoder() replaces categories by the mean value of the target for each
category.
math:
mapping = (w_i) posterior + (1-w_i) prior
where
w_i = n_i t / (s + n_i t)
In the previous equation, t is the target variance in the entire dataset, s is the
target variance within the category and n is the number of observations for the
category.
Parameters
----------
smoothing: int, float, 'auto', default=0.0
"""
super().__init__()
self.smoothing = smoothing
self.inputCols = inputCols
self.labelCol = labelCol
def _fit(self, df):
"""
Learn the mean value of the target for each category of the variable.
"""
self.encoder_dict = {}
inputCols = self.inputCols
labelCol = self.labelCol
y_prior = df.select(fn.mean(labelCol).alias("mean")).first()["mean"]
for var in inputCols:
if self.smoothing == "auto":
y_var = df.cov(labelCol, labelCol)
damping = fn.variance(labelCol) / y_var
else:
damping = fn.lit(self.smoothing)
groups = df.groupBy(var).agg(
fn.mean(labelCol).alias("posterior"),
fn.count("*").alias("counts"),
damping.alias("damping")
).toPandas().dropna()
groups["lambda"] = groups["counts"] / (groups["counts"] + groups["damping"])
groups["code"] = (
groups["lambda"] * groups["posterior"] +
(1.0 - groups["lambda"]) * y_prior
)
self.encoder_dict[var] = dict(zip(groups[var], groups["code"]))
return self
def _transform(self, df):
for var in self.encoder_dict:
mapping = {k: str(v) for k,v in self.encoder_dict[var].items()}
df = df.replace(mapping, subset=[var])
df = df.withColumn(var, df[var].cast('float'))
print(f'{len(self.encoder_dict):d} columns were mean encoded')
return df
# replace categories by the mean value of the target for each category.
inputCols = ['OCCUPATION_TYPE', 'ORGANIZATION_TYPE']
mean_encoder = MeanEncoder(
inputCols=inputCols,
labelCol='label',
smoothing='auto'
)
mean_encoder.fit(df).transform(df).select(inputCols).show(5)
2 columns were mean encoded
+---------------+-----------------+
|OCCUPATION_TYPE|ORGANIZATION_TYPE|
+---------------+-----------------+
| 0.062140968| 0.09299603|
| 0.09631742| 0.09449421|
| 0.113258936| 0.10173836|
| NULL| NULL|
| NULL| NULL|
+---------------+-----------------+
only showing top 5 rows
哑变量编码
无序分类特征对于树集成模型(tree-ensemble like XGBoost)是可用的,但对于线性模型(like Lasso or Ridge)则必须使用one-hot重编码。接下来我们把上节索引化的无序分类特征进行编码。
# The nominative (unordered) categorical features
encoded_cols = ['NAME_EDUCATION_TYPE', 'OCCUPATION_TYPE', 'ORGANIZATION_TYPE']
nominal_categories = [col for col in categorical_cols if col not in encoded_cols]
indexedCols = [f"indexed_{col}" for col in nominal_cate