问题
实际处理和解决机器学习问题过程中,我们会遇到一些“大数据”问题,比如有上百万条数据,上千上万维特征,此时数据存储已经达到10G这种级别。这种情况下,如果还是直接使用传统的方式肯定行不通,比如当你想把数据load到内存中转成numpy数组,你会发现要么创建不了那么大的numpy矩阵,要么直接加载时报MemeryError。
在这种情况下我了解了几种选择办法,1. 对数据进行降维,2. 使用流式或类似流式处理,3. 上大机器,高内存的,或者用spark集群。
文档
Sklearn里面提供一些流式处理方法。具体可以参考官方文档:
讲解了怎么处理 big data 文件:http://scikit-learn.org/stable/modules/scaling_strategies.html
通过一个例子讲解了怎么用:http://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html
简单介绍
我看了上面两个文档,并使用介绍的SGDClassifier进行分类,效果挺好的,这里记录下用法。
要实现big data的处理,需要满足三个条件:
1. 有流式数据
2. 能从数据中可以提取出特征
3. 增量学习算法
1. 流式数据
第一个条件,要给算法流式数据或小batch的数据,比如一次提供1000条这样。这一块是需要自己写代码提供的,可以实现一个生成器,每调用一次提供一份小batch数据。
2. 提取特征
第二个条件,可以使用任何一种sklearn中支持的特征提取方法。对于一些特殊情况,比如特征需要标准化或者是事先不知道特征值的情况下需要特殊处理。
3. 增量学习算法
对于第三个条件,sklearn中提供了很多增量学习算法。虽然不是所有的算法都可以增量学习,但是学习器提供了 partial_fit
的函数的都可以进行增量学习。事实上,使用小batch的数据中进行增量学习(有时候也称为online learning)是这种学习方式的核心,因为它能让任何一段时间内内存中只有少量的数据。
sklearn提供很多增量学习算法:
- Classification
- sklearn.naive_bayes.MultinomialNB
- sklearn.naive_bayes.BernoulliNB
- sklearn.linear_model.Perceptron
- sklearn.linear_model.SGDClassifier
- sklearn.linear_model.PassiveAggressiveClassifier
- Regression
- sklearn.linear_model.SGDRegressor
- sklearn.linear_model.PassiveAggressiveRegressor
- Clustering
- sklearn.cluster.MiniBatchKMeans
- Decomposition / feature Extraction
- sklearn.decomposition.MiniBatchDictionaryLearning
- sklearn.decomposition.IncrementalPCA
- sklearn.decomposition.LatentDirichletAllocation
- sklearn.cluster.MiniBatchKMeans
其中对于分类问题,在第一次调用partial_fit
时需要通过classes
参数指定分类的类别。
另外有一点需要考虑,所有的学习器在学习过程中不会对每个样例赋予同样的权重。对于感知机,它对于bad样本会敏感,即使学习器已经学习了很多样本了,而对于SGD和PassiveAggressive,对于这种情况会更鲁棒一点,后者在学习的时候,后来学习样本的权重会随着学习器学习率的下降而降低。
实例
这里举一个实际的例子。我这边有上G的训练文件和测试文件,都是csv格式。因为没法直接都读进内存处理,所以选择增量学习的方式处理。
1. 生成一个文件流迭代器
def iter_minibatches(data_stream, minibatch_size=1000):
'''
迭代器
给定文件流(比如一个大文件),每次输出minibatch_size行,默认选择1k行
将输出转化成numpy输出,返回X, y
'''
X = []
y = []
cur_line_num = 0 csvfile = file(data_stream, 'rb')
reader = csv.reader(csvfile)
for line in reader:
y.append(float(line[0]))
X.append(line[1:]) # 这里要将数据转化成float类型 cur_line_num += 1
if cur_line_num >= minibatch_size:
X, y = np.array(X), np.array(y) # 将数据转成numpy的array类型并返回
yield X, y
X, y = [], []
cur_line_num = 0
csvfile.close() # 生成测试文件
minibatch_test_iterators = iter_minibatches(test_file, minibatch_size=5000)
X_test, y_test = minibatch_test_iterators.next() # 得到一份测试文件
2. 增量训练
from sklearn.linear_model import SGDClassifier
sgd_clf = SGDClassifier() # SGDClassifier的参数设置可以参考sklearn官网
minibatch_train_iterators = iter_minibatches(data_part_file, minibatch_size=2000) for i, (X_train, y_train) in enumerate(minibatch_train_iterators):
# 使用 partial_fit ,并在第一次调用 partial_fit 的时候指定 classes
sgd_clf.partial_fit(X_train, y_train, classes=np.array([0, 1]))
print("{} time".format(i)) # 当前次数
print("{} score".format(sgd_clf.score(X_test, y_test))) # 在测试集上看效果
3. 结果
0 time
0.679 score
1 time
0.6954 score
2 time
0.712 score
3 time
0.7248 score
...
57 time
0.745 score
58 time
0.7394 score
59 time
0.7398 score
4. 一点补充
- 当SGD的损失函数为log时,SGD等价于LR。
- 数据只迭代一次分类器可能还没完全收敛,可以多迭代几次
- mini-batch的量不要设置太小,太小的话,需要多迭代几次才能收敛