项目实战-使用PySpark处理文本多分类问题

时间:2022-06-02 06:41:51

原文链接:https://cloud.tencent.com/developer/article/1096712

在大神创作的基础上,学习了一些新知识,并加以注释。

TARGET:将旧金山犯罪记录(San Francisco Crime Description)分类到33个类目中

源代码及数据集:之后提交。

一、载入数据集data

 import time
from pyspark.sql import SQLContext
from pyspark import SparkContext
# 利用spark的csv库直接载入csv格式的数据
sc = SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('train.csv')
# 选10000条数据集,减少运行时间
data = data.sample(False, 0.01, 100)
print(data.count())
结果:
8703

1.1 除去与需求无关的列
 # 除去一些不要的列,并展示前五行
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

1.2 显示数据结构

 # 利用printSchema()方法显示数据的结构
data.printSchema()

结果:

root
|-- Category: string (nullable = true)
|-- Descript: string (nullable = true) 1.3 查看犯罪类型最多的前20个
 # 包含数量最多的20类犯罪
from pyspark.sql.functions import col
data.groupBy('Category').count().orderBy(col('count').desc()).show()

结果:

+--------------------+-----+
| Category|count|
+--------------------+-----+
| LARCENY/THEFT| 1725|
| OTHER OFFENSES| 1230|
| NON-CRIMINAL| 962|
| ASSAULT| 763|
| VEHICLE THEFT| 541|
| DRUG/NARCOTIC| 494|
| VANDALISM| 447|
| WARRANTS| 406|
| BURGLARY| 347|
| SUSPICIOUS OCC| 295|
| MISSING PERSON| 284|
| ROBBERY| 225|
| FRAUD| 159|
| SECONDARY CODES| 124|
|FORGERY/COUNTERFE...| 109|
| WEAPON LAWS| 86|
| TRESPASS| 63|
| PROSTITUTION| 59|
| DISORDERLY CONDUCT| 54|
| DRUNKENNESS| 52|
+--------------------+-----+
only showing top 20 rows

 1.4 查看犯罪描述最多的前20个

 # 包含犯罪数量最多的20个描述
data.groupBy('Descript').count().orderBy(col('count').desc()).show()
结果:

+--------------------+-----+
| Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...| 569|
| LOST PROPERTY| 323|
| BATTERY| 301|
| STOLEN AUTOMOBILE| 262|
|DRIVERS LICENSE, ...| 244|
|AIDED CASE, MENTA...| 223|
| WARRANT ARREST| 222|
|PETTY THEFT FROM ...| 216|
|SUSPICIOUS OCCURR...| 211|
|MALICIOUS MISCHIE...| 184|
| TRAFFIC VIOLATION| 168|
|THREATS AGAINST LIFE| 154|
|PETTY THEFT OF PR...| 152|
| FOUND PROPERTY| 138|
|MALICIOUS MISCHIE...| 138|
|ENROUTE TO OUTSID...| 121|
|GRAND THEFT OF PR...| 115|
|MISCELLANEOUS INV...| 101|
| DOMESTIC VIOLENCE| 99|
| FOUND PERSON| 98|
+--------------------+-----+
only showing top 20 rows 二、对犯罪描述进行分词
2.1 对Descript分词,先切分单词,再删除停用词

流程和scikit-learn版本的很相似,包含3个步骤:
1.regexTokenizer: 利用正则切分单词
2.stopwordsRemover: 移除停用词
3.countVectors: 构建词频向量

RegexTokenizer:基于正则的方式进行文档切分成单词组
inputCol: 输入字段
outputCol: 输出字段
pattern: 匹配模式,根据匹配到的内容切分单词

CountVectorizer:构建词频向量
covabSize: 限制的词频数
minDF:如果是float,则表示出现的百分比小于minDF,不会被当做关键词
如果是int,则表示出现是次数小于minDF,不会被当做关键词

 from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression # 正则切分单词
# inputCol:输入字段名
# outputCol:输出字段名
regexTokenizer = RegexTokenizer(inputCol='Descript', outputCol='words', pattern='\\W')
# 停用词
add_stopwords = ['http', 'https', 'amp', 'rt', 't', 'c', 'the']
stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered').setStopWords(add_stopwords)
# 构建词频向量
count_vectors = CountVectorizer(inputCol='filtered', outputCol='features', vocabSize=10000, minDF=5)

2.2 对分词后的词频率排序,最频繁出现的设置为0

StringIndexer
StringIndexer将一列字符串label编码为一列索引号,根据label出现的频率排序,最频繁出现的label的index为0
该例子中,label会被编码成从0-32的整数,最频繁的label被编码成0

Pipeline是基于DataFrame的高层API,可以方便用户构建和调试机器学习流水线,可以使得多个机器学习算法顺序执行,达到高效的数据处理的目的。

fit():将DataFrame转换成一个Transformer的算法,将label列转化为特征向量
transform(): 将特征向量作为新列添加到DataFrame

 from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol='Category', outputCol='label')
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, count_vectors, label_stringIdx])
# fit the pipeline to training documents
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
dataset.show(5)

结果:

+---------------+--------------------+--------------------+--------------------+--------------------+-----+
| Category| Descript| words| filtered| features|label|
+---------------+--------------------+--------------------+--------------------+--------------------+-----+
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(309,[0,2,3,4,6],...| 0.0|
| VEHICLE THEFT| STOLEN AUTOMOBILE|[stolen, automobile]|[stolen, automobile]|(309,[9,27],[1.0,...| 4.0|
| NON-CRIMINAL| FOUND PROPERTY| [found, property]| [found, property]|(309,[5,32],[1.0,...| 2.0|
|SECONDARY CODES| JUVENILE INVOLVED|[juvenile, involved]|[juvenile, involved]|(309,[67,218],[1....| 13.0|
| OTHER OFFENSES|DRIVERS LICENSE, ...|[drivers, license...|[drivers, license...|(309,[14,23,28,30...| 1.0|
+---------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows 三、训练/测试集划分
 # set seed for reproducibility
# 数据集划分训练集和测试集,比例7:3, 设置随机种子100
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print('Training Dataset Count:{}'.format(trainingData.count()))
print('Test Dataset Count:{}'.format(testData.count()))

结果:

Training Dataset Count:6117
Test Dataset Count:2586 四、模型训练和评价
4.1 以词频作为特征,利用逻辑回归进行分类
模型在测试集上预测和打分,查看10个预测概率值最高的结果:

LogisticRegression:逻辑回归模型
maxIter:最大迭代次数
regParam:正则化参数
elasticNetParam:正则化。0:l1;1:l2

 start_time = time.time()
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
# 过滤prediction类别为0数据集
predictions.filter(predictions['prediction'] == 0).select('Descript', 'Category', 'probability', 'label', 'prediction').orderBy('probability', accending=False).show(n=10, truncate=30)

结果:

+--------------------------+--------+------------------------------+-----+----------+
| Descript|Category| probability|label|prediction|
+--------------------------+--------+------------------------------+-----+----------+
| ARSON OF A VEHICLE| ARSON|[0.1194196587417514,0.10724...| 26.0| 0.0|
| ARSON OF A VEHICLE| ARSON|[0.1194196587417514,0.10724...| 26.0| 0.0|
| ARSON OF A VEHICLE| ARSON|[0.1194196587417514,0.10724...| 26.0| 0.0|
| ATTEMPTED ARSON| ARSON|[0.12978385966276762,0.1084...| 26.0| 0.0|
| CREDIT CARD, THEFT OF| FRAUD|[0.21637136655265077,0.0836...| 12.0| 0.0|
| CREDIT CARD, THEFT OF| FRAUD|[0.21637136655265077,0.0836...| 12.0| 0.0|
| CREDIT CARD, THEFT OF| FRAUD|[0.21637136655265077,0.0836...| 12.0| 0.0|
| CREDIT CARD, THEFT OF| FRAUD|[0.21637136655265077,0.0836...| 12.0| 0.0|
| CREDIT CARD, THEFT OF| FRAUD|[0.21637136655265077,0.0836...| 12.0| 0.0|
|ARSON OF A VACANT BUILDING| ARSON|[0.22897903829071928,0.0980...| 26.0| 0.0|
+--------------------------+--------+------------------------------+-----+----------+
only showing top 10 rows
 from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# predictionCol: 预测列的名称
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
# 预测准确率
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

结果:

0.9641817609126011
8.245999813079834 4.2 以TF-ID作为特征,利用逻辑回归进行分类
 from pyspark.ml.feature import HashingTF, IDF
start_time = time.time()
# numFeatures: 最大特征数
hashingTF = HashingTF(inputCol='filtered', outputCol='rawFeatures', numFeatures=10000)
# minDocFreq:过滤的最少文档数量
idf = IDF(inputCol='rawFeatures', outputCol='features', minDocFreq=5)
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, hashingTF, idf, label_stringIdx])
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100) lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)
predictions = lr_model.transform(testData)
predictions.filter(predictions['prediction'] == 0).select('Descript', 'Category', 'probability', 'label', 'prediction').\
orderBy('probability', ascending=False).show(n=10, truncate=30)

结果:

+----------------------------+-------------+------------------------------+-----+----------+
| Descript| Category| probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.865376337558355,0.018892...| 0.0| 0.0|
+----------------------------+-------------+------------------------------+-----+----------+
only showing top 10 rows
 evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

结果:

0.9653361434618551
12.998999834060669 4.3 交叉验证
用交叉验证来优化参数,这里针对基于词频特征的逻辑回归模型进行优化
 from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
start_time = time.time()
pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, count_vectors, label_stringIdx])
pipeline_fit = pipeline.fit(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
# 为交叉验证创建参数
# ParamGridBuilder:用于基于网格搜索的模型选择的参数网格的生成器
# addGrid:将网格中给定参数设置为固定值
# parameter:正则化参数
# maxIter:迭代次数
# numFeatures:特征值
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.1, 0.3, 0.5])
.addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
.addGrid(lr.maxIter, [10, 20, 50])
# .addGrid(idf.numFeatures, [10, 100, 1000])
.build()) # 创建五折交叉验证
# estimator:要交叉验证的估计器
# estimatorParamMaps:网格搜索的最优参数
# evaluator:评估器
# numFolds:交叉次数
cv = CrossValidator(estimator=lr,\
estimatorParamMaps=paramGrid,\
evaluator=evaluator,\
numFolds=5)
cv_model = cv.fit(trainingData)
predictions = cv_model.transform(testData) # 模型评估
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

结果:

0.9807684755923513
368.97300004959106 4.4 朴素贝叶斯
 from pyspark.ml.classification import NaiveBayes
start_time = time.time()
# smoothing:平滑参数
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select('Descript', 'Category', 'probability', 'label', 'prediction') \
.orderBy('probability', ascending=False) \
.show(n=10, truncate=30)

结果:

+----------------------+-------------+------------------------------+-----+----------+
| Descript| Category| probability|label|prediction|
+----------------------+-------------+------------------------------+-----+----------+
| PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...| 0.0| 0.0|
| PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...| 0.0| 0.0|
| PETTY THEFT BICYCLE|LARCENY/THEFT|[1.0,1.236977662838925E-20,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
|GRAND THEFT PICKPOCKET|LARCENY/THEFT|[1.0,7.699728277574397E-24,...| 0.0| 0.0|
+----------------------+-------------+------------------------------+-----+----------+
only showing top 10 rows
 evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

结果:

0.977432832447723
5.371000051498413 4.5 随机森林
 from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
# numTree:训练树的个数
# maxDepth:最大深度
# maxBins:连续特征离散化的最大分类数
rf = RandomForestClassifier(labelCol='label', \
featuresCol='features', \
numTrees=100, \
maxDepth=4, \
maxBins=32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
.select('Descript','Category','probability','label','prediction') \
.orderBy('probability', ascending=False) \
.show(n = 10, truncate = 30)

结果:

+----------------------------+-------------+------------------------------+-----+----------+
| Descript| Category| probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33206188381818563,0.1168...| 0.0| 0.0|
+----------------------------+-------------+------------------------------+-----+----------+
only showing top 10 rows
 evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print(evaluator.evaluate(predictions))
end_time = time.time()
print(end_time - start_time)

结果:

0.27929770811242954
36.63699984550476

上面的结果可以看出:随机森林是优秀的、鲁棒的通用模型,但对于高维稀疏数据来说,并不是一个很好的选择。
明显,选择使用交叉验证的逻辑回归

但是选择交叉验证的逻辑回归时需要注意一点:由于使用了交叉验证,训练时间会过长,在实际的应用场景中要根据业务选择最合适的模型。