Spark机器学习之特征提取、选择、转换

时间:2021-12-20 14:39:10
本节介绍了处理特征的算法,大致分为以下几组:
     1、提取:从“原始”数据提取特征
     2、转换:缩放,转换或修改要素
     3、选择:从一组较大的要素中选择一个子集
     4、局部敏感哈希(LSH):这类算法将特征变换的方面与其他算法相结合。


1、特征提取
1.1 TF-IDF
(term frequency–inverse document frequency/词频-逆文本/文档频率)
    词频-逆文本频率(TF-IDF)是在文本挖掘中广泛使用的特征向量化方法,反映了语料库中一个词对文档的重要性(语料库中的其中一份文档的重要程度)。 
用t表示一个单词,用d表示文档,用D表示语料库。词频率TF(t,d)是单词t出现在文档d中的次数,而文档频率DF(t,D)是单词t在语料库D中的频率(出现单词t的文档的次数)。 
如果我们只使用词频率来测量重要性,那么很容易过分强调出现得非常频繁但是携带关于文档的信息很少的单词,例如, “a”,“the”和“of”。 如果一个单词在语料库中经常出现,
则意味着它不携带关于特定文档的特殊信息。 逆文本频率是一个单词提供多少信息的数值度量:

 Spark机器学习之特征提取、选择、转换Spark机器学习之特征提取、选择、转换                            
    其中| D | 是语料库中文档的总数。 由于使用对数,如果一个单词出现在所有文档中,则其IDF值变为0。如果一个词越常见,那么分母就越大,逆文档频率就越小越接近0。
分母之所以要加1,是为了避免分母为0(即所有文档都不包含该单词的情况下出现0)。TF-IDF度量仅仅是TF和IDF的乘积:
                                                                                                              Spark机器学习之特征提取、选择、转换 Spark机器学习之特征提取、选择、转换
    词频率和文档频率的定义有几个变体。 在MLlib中,我们分离TF和IDF以使它们灵活。
    
    TF:HashingTF和CountVectorizer都可以用于生成词频率向量。
    HashingTF是一个变换器,它采用词集合并将这些集合转换成固定长度的特征向量。在文本处理中,“一组词”可能是一袋词。 HashingTF使用散列技巧。
通过应用散列函数将原始要素映射到索引。这里使用的散列函数是MurmurHash 3。然后基于映射的索引来计算项频率。这种方法避免了计算全局术语到索引映射的需要,
其对于大语料库可能是昂贵的,但是它遭受潜在的哈希冲突,其中不同的原始特征可能在哈希后变成相同的术语。为了减少冲突的机会,我们可以增加目标要素维度,即哈希表的桶数。
由于使用简单模数将散列函数转换为列索引,因此建议使用2的幂作为要素维度,否则不会将要素均匀映射到列。默认要素尺寸为2的18次方 = 262144。可选的二进制切换参数控制单词频率计数。
设置为true时,所有非零频率计数都设置为1.这对于模拟二进制计数而不是整数计数的离散概率模型特别有用。
    CountVectorizer将文本文档转换为单词计数的向量。 有关更多详细信息,请参阅CountVectorizer。
    IDF:IDF是一个拟合数据集并生成IDFModel的估计器。 IDFModel获取特征向量(通常由HashingTF或CountVectorizer创建)并缩放每个列。 直观地,它降低了语料库中频繁出现的列的权重。
    注意:spark.ml不提供文本分段工具。 推荐用户使用NLP、scalanlp和chalk。
    
    Examples:
    在下面的代码段中,我们从一组句子开始。 我们使用Tokenizer将每个句子分成单词。 对于每个句子(词袋),我们使用HashingTF将句子散列成特征向量。 我们使用IDF重新缩放特征向量; 
这通常会提高使用文本作为特征时的性能。 然后我们的特征向量可以传递到学习算法中。


from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
],["label", "sentence"])


tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)


hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
#CountVectorizer也可以用于获得项频率向量


idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


rescaledData.select("label", "features").show()
rescaledData.take(1)
输出:
[Row(label=0.0, sentence='Hi I heard about Spark', words=['hi', 'i', 'heard', 'about', 'spark'],
rawFeatures=SparseVector(20, {0: 1.0, 5: 1.0, 9: 1.0, 17: 2.0}), features=SparseVector(20, {0: 0.6931, 5: 0.6931, 9: 0.2877, 17: 1.3863}))]

1.2 Word2Vec
Word2Vec是一个估计器,它接受代表文档的单词序列,并训练一个Word2VecModel。 该模型将每个词映射到唯一的固定大小的向量。 Word2VecModel使用文档中所有单词的平均值将每个文档转换为向量; 此向量可以用作预测,文档相似性计算等的特征。有关详细信息,请参阅ML2ib用户指南。

在下面的代码段中,我们从一组文档开始,每个文档表示为一个单词序列。 对于每个文档,我们将其转换为特征向量。 然后可以将该特征向量传递到学习算法

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
1.3 CountVectorizer

CountVectorizer和CountVectorizerModel旨在帮助将文本文档的集合转换为令牌计数的向量。 当一个先验字典不可用时,CountVectorizer可以用作估计器来提取词汇表,并生成CountVectorizerModel。 该模型为词汇表上的文档生成稀疏表示,然后可以将其传递给其他算法,如LDA。

在拟合过程中,CountVectorizer将选择通过语料库的词频率排序的顶部vocabSize词。 可选参数minDF还通过指定词汇必须出现在词汇表中的文档的最小数量(或小于1.0)来影响拟合过程。 另一个可选的二进制切换参数控制输出向量。 如果设置为true,则所有非零计数都设置为1.这对于模拟二进制计数而不是整数计数的离散概率模型特别有用。

Examples:

假设我们有以下DataFrame和列id和文本:

 id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
文本中的每一行都是Array [String]类型的文档。 调用CountVectorizer的匹配产生具有词汇(a,b,c)的CountVectorizerModel。 然后转换后的输出列“向量”包含

id | texts                           | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
2 特征变换

2.1 Tokenizer /分词器

标记化是获取文本(例如句子)并将其分解成单个术语(通常是单词)的过程。 一个简单的Tokenizer类提供了这个功能。 下面的例子显示了如何将句子分成单词序列。

RegexTokenizer允许基于正则表达式(regex)匹配的更高级的标记化。 默认情况下,参数“pattern”(regex,默认:“\\ s +”)用作分隔符以分隔输入文本。 或者,用户可以将参数“gap”设置为false,指示正则表达式“模式”表示“令牌”,而不是分割间隙,并且找到所有匹配的出现作为令牌化结果。

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
2.2 StopWordsRemover 

停止词是应该从输入中排除的词,通常是因为词频繁出现并且不具有任何意义。
StopWordsRemover接受字符串序列(例如Tokenizer的输出)作为输入,并从输入序列中删除所有停止词。 停用词列表由stopWords参数指定。 某些语言的默认停用词可通过调用StopWordsRemover.loadDefaultStopWords(语言)访问,其中可用的选项是“danish”,“dutch”,“english”,“finnish”,“french”,“german”,“hungarian” “意大利语”,“挪威语”,“葡萄牙语”,“俄语”,“西班牙语”,“瑞典语”和“土耳其语”。 布尔参数caseSensitive指示匹配是否区分大小写(默认为false)。

例子:
假设我们有以下DataFrame和列id和raw:

id | raw
----|----------
0 | [I, saw, the, red, baloon]
1 | [Mary, had, a, little, lamb]
应用StopScriptRemover,将raw作为输入列,并过滤为输出列,我们应该得到以下结果:

id | raw                         | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, baloon] | [saw, red, baloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
在过滤时,停止词“I”,“the”,“had”和“a”已被过滤掉。

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
2.3 n-gram

n-gram是对于某个整数n的n个令牌(通常为字)的序列。 NGram类可以用于将输入特征转换为n-gram。
NGram将字符串序列(例如,Tokenizer的输出)作为输入。 参数n用于确定每个n-gram中的项的数量。 输出将包括一个n-gram序列,其中每个n-gram由n个连续字的空格分隔的字符串表示。 如果输入序列包含少于n个字符串,则不会生成输出。

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

2.4 Binarizer (二值化

二值化是将数字特征阈值为二进制(0/1)特征的过程。
Binarizer接受通用参数inputCol和outputCol以及二进制阈值。 大于阈值的特征值被二进制化为1.0; 等于或小于阈值的值被二值化为0.0。 inputCol支持Vector和Double类型。

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
2.5 PCA(主成分分析

PCA是使用正交变换将可能相关的变量的观察值的集合转换成称为主分量的线性不相关变量的值的集合的统计过程。 PCA类训练模型使用PCA将向量投影到低维空间。 下面的示例显示了如何将5维特征向量投影到3维主成分中。

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
2.6 PolynomialExpansion (多项式扩展

多项式展开是将特征扩展到多项式空间的过程,该多项式空间由原始尺寸的n度组合表示。 PolynomialExpansion类提供此功能。 下面的示例显示了如何将您的要素扩展到3度多项式空间。

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
2.7  Discrete Cosine Transform (DCT-离散余弦变换)

离散余弦变换将时域中的长度N实值序列变换为频域中的另一长度N实值序列。 DCT类提供此功能,实现DCT-II并且通过1/√2/缩放结果,使得用于变换的表示矩阵是统一的。 没有偏移被应用于变换序列(例如,变换序列的第0个元素是第0个DCT系数而不是第N / 2个)。

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
2.8 StringIndexer 字符串索引

StringIndexer将标签的字符串列编码为标签索引列。 索引位于[0,numLabels),按标签频率排序,因此最常见的标签获取索引0.如果输入列是数字,我们将其转换为字符串并索引字符串值。 当下游管道组件(如Estimator或Transformer)使用此字符串索引标签时,必须将组件的输入列设置为此字符串索引的列名称。 在许多情况下,可以使用setInputCol设置输入列。

例子
假设我们有以下DataFrame和列id和category:

id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category是具有三个标签:“a”,“b”和“c”的字符串列。 应用StringIndexer,其中category作为输入列,categoryIndex作为输出列,我们应该得到以下结果:

 id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
“a”获得索引0,因为它是最频繁的,随后是具有索引1的“c”和具有索引2的“b”。
此外,有两个策略,关于StringIndexer如何处理未见的标签,当你已经适应一个StringIndexer在一个数据集,然后使用它来转换另一个:
抛出异常(这是默认值)
请跳过包含未看见标签的行

例子
让我们回到前面的例子,但这次重用我们以前定义的StringIndexer在下面的数据集:

id | category
----|----------
0 | a
1 | b
2 | c
3 | d
如果您没有设置StringIndexer如何处理未看见的标签或将其设置为“错误”,则会抛出异常。 但是,如果您调用了setHandleInvalid(“skip”),将生成以下数据集:

 id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
请注意,包含“d”的行不会出现。

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
2.9 IndexToString

对称到StringIndexer,IndexToString将一列标签索引映射回包含原始标签的列作为字符串。 一个常见的用例是使用StringIndexer从标签生成索引,使用这些索引训练模型,并从IndexToString的预测索引列中检索原始标签。 但是,您可以*提供您自己的标签。
例子
基于StringIndexer示例,让我们假设我们有以下DataFrame和列id和categoryIndex:

 id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
应用IndexToString,将categoryIndex作为输入列,将originalCategory作为输出列,我们可以检索我们的原始标签(它们将从列的元数据中推断):

id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
2.10 OneHotEncoder(独热码)

独热编码将一列标签索引映射到二进制向量列,最多具有单个一值。 此编码允许期望连续特征(例如逻辑回归)的算法使用分类特征。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
2.11 VectorIndexer
VectorIndexer帮助索引向量数据集中的分类特征。它可以自动决定哪些特征是分类的,并将原始值转换为类别索引。具体来说,它执行以下操作:
1、获取Vector类型的输入列和参数maxCategories。
2、基于不同值的数量确定哪些特征应是分类的,其中最多maxCategories的特征被声明为分类。
3、为每个分类特征计算基于0的类别索引。
4、索引分类特征并将原始特征值转换为索引。
5、索引分类特征允许诸如决策树和树集合等算法适当地处理分类特征,从而提高性能。
在下面的例子中,我们读取一个标记点​​的数据集,然后使用VectorIndexer来决定哪些特征应该被视为分类。我们将分类特征值转换为它们的索引。这个变换的数据然后可以被传递到诸如DecisionTreeRegressor的处理分类特征的算法。

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
2.12 Interaction

交互是一个变换器,它使用向量或双值列,并生成单个向量列,其中包含每个输入列的一个值的所有组合的乘积。例如,如果您有两个向量类型列,每个列有3个维度作为输入列,那么您将获得一个9维向量作为输出列。
例子
假设我们有以下DataFrame的列“id1”,“vec1”和“vec2”:

id1|vec1          |vec2          
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
应用与这些输入列的交互,然后interactionedCol作为输出列包含:

id1|vec1          |vec2          |interactedCol                                         
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
2.13 Normalizer (正则化

Normalizer是一个变换器,用于变换向量行的数据集,规范每个向量以具有单位范数。 它采用参数p,其指定用于归一化的p范数。 (默认情况下p = 2)。这种归一化可以帮助标准化输入数据并改进学习算法的行为。
以下示例演示如何加载libsvm格式的数据集,然后将每行标准化为具有单位L1范数和单位L∞范数。

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
2.14 StandardScaler(标准化缩放)

StandardScaler变换向量行的数据集,规范每个特征以具有单位标准偏差和/或零均值。 它需要参数:
withStd:默认为True。 将数据缩放到单位标准偏差。
withMean:默认为False。 在缩放之前用平均值居中数据。 它将构建密集输出,因此在应用于稀疏输入时要小心。
StandardScaler是一个估计器,可以适合数据集以产生一个StandardScalerModel; 这相当于计算摘要统计。 然后,模型可以将数据集中的向量列变换为具有单位标准偏差和/或零均值特征。
请注意,如果要素的标准偏差为零,则该要素的向量中将返回默认值0.0。
以下示例演示如何以libsvm格式加载数据集,然后规范化每个要素的单位标准偏差。

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
2.15 MinMaxScaler

MinMaxScaler变换矢量行的数据集,将每个要素重新缩放到特定范围(通常为[0,1])。 它需要参数:
min:默认值为0.0。 转换后的下边界,由所有特征共享。
max:1.0。 转换后的上界,由所有特征共享。
MinMaxScaler计算数据集的汇总统计信息,并生成MinMaxScalerModel。 然后,模型可以单独变换每个特征,使得它在给定范围内。
特征E的重新缩放的值被计算为,

Spark机器学习之特征提取、选择、转换

对于Spark机器学习之特征提取、选择、转换
请注意,由于零值可能会转换为非零值,即使对于稀疏输入,变压器的输出也将是DenseVector。
以下示例演示如何以libsvm格式加载数据集,然后将每个要素重新缩放为[0,1]。

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()

2.16 MaxAbsScaler

MaxAbsScaler转换Vector行的数据集,通过划分每个要素中的最大绝对值,将每个要素重新缩放到range [-1,1]。 它不会移动/居中数据,因此不会破坏任何稀疏性。MaxAbsScaler计算数据集上的汇总统计信息,并生成MaxAbsScalerModel。 然后,模型可以将每个要素单独转换为range [-1,1]。以下示例演示如何加载libsvm格式的数据集,然后将每个要素重新缩放到[-1,1]。

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()

2.17 Bucketizer(连续数据离散化到指定的范围区间)

Bucketizer将一列连续要素转换为一列要素桶,其中的桶由用户指定。它需要一个参数:
split:用于将连续要素映射到bucket的参数。对于n + 1分割,有n个桶。由分割x,y定义的桶保存除了最后一个桶之外的范围[x,y)中的值,其还包括y。分割应严格增加。必须明确提供-inf,inf的值以涵盖所有Double值;否则,指定拆分之外的值将被视为错误。分割的两个示例是Array(Double.NegativeInfinity,0.0,1.0,Double.PositiveInfinity)和Array(0.0,1.0,2.0)。
注意,如果你不知道目标列的上限和下限,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为分割的边界,以防止潜在的Bucketizer边界异常。还要注意,您提供的分割必须严格按升序,即s0 <s1 <s2 <... <sn。
有关详细信息,请参阅Bucketizer的API文档。
以下示例演示如何将双列列存储到另一个索引列的列中。

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

2.18 ElementwiseProduct

ElementwiseProduct使用元素级乘法将每个输入向量乘以提供的“权重”向量。 换句话说,它通过标量乘法器来缩放数据集的每一列。 这表示输入向量v和变换向量w之间的Hadamard乘积,以产生结果向量。

Spark机器学习之特征提取、选择、转换

下面的示例演示了如何使用变换向量值来变换向量。

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).SQLTransformershow()
2.19 SQLTransformer

SQLTransformer实现由SQL语句定义的变换。 目前我们只支持SQL语法,如“SELECT ... FROM __THIS__ ...”,其中“__THIS__”表示输入数据集的基础表。 select子句指定要在输出中显示的字段,常量和表达式,并且可以是Spark SQL支持的任何select子句。 用户还可以使用Spark SQL内置函数和UDF操作这些选定的列。 例如,SQLTransformer支持以下语句:

  • SELECT a, a + b AS a_b FROM __THIS__
  • SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
  • SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例子
假设我们有以下DataFrame和列id,v1和v2:

id |  v1 |  v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
这是SQLTransformer
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
:语句的输出

id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
2.20 VectorAssembler

VectorAssembler是一个变换器,将给定的列列表组合成一个单一的向量列。 它有用于将由不同特征变换器生成的原始特征和特征组合成单个特征向量,以便训练ML模型,如逻辑回归和决策树。 VectorAssembler接受以下输入列类型:所有数字类型,布尔类型和向量类型。 在每一行中,输入列的值将按指定的顺序连接到向量中。
例子
假设我们有一个带有id,hour,mobile,userFeatures和clicked列的DataFrame:

id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures是一个包含三个用户特征的向量列。 我们希望将hour,mobile和userFeatures合并成一个称为特征的单一特征向量,并使用它来预测是否被点击。 如果我们将VectorAssembler的输入列设置为hour,mobile和userFeatures,并将列输出到要素,则在转换后,我们应该得到以下DataFrame:

id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

2.21 QuantileDiscretizer


3、特征选择

3.1 VectorSlicer

VectorSlicer是一个处理特征向量的变换器,并输出一个新的原始特征子集的特征向量。 从向量列中提取特征很有用。VectorSlicer接受具有指定索引的向量列,然后输出一个新的向量列,其值通过这些索引进行选择。 有两种类型的索引,

1、代表向量中的索引的整数索引,setIndices()。

2、表示向量中特征名称的字符串索引,setNames()。 这需要向量列具有AttributeGroup,因为要实现在Attribute的name字段匹配。

整数和字符串的规格都可以接受。 此外,您可以同时使用整数索引和字符串名称。 必须至少选择一个特征。 重复的功能是不允许的,所以选择的索引和名称之间不能有重叠。 请注意,如果选择了功能的名称,则会遇到空的输入属性时会抛出异常。

输出向量将首先(按照给定的顺序)对所选索引的特征进行排序,其次是所选择的名称(按照给定的顺序)。

例子
假设我们有一个DataFrame与列userFeatures:

userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures是一个包含三个用户功能的向量列。 假设userFeature的第一列全部为零,因此我们要删除它并仅选择最后两列。 VectorSlicer使用setIndices(1,2)选择最后两个元素,然后生成一个名为features的新向量列:

 userFeatures     | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
假设我们对userFeatures具有潜在的输入属性,即[“f1”,“f2”,“f3”],那么我们可以使用setNames(“f2”,“f3”)来选择它们。

 userFeatures     | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
3.2 RFormula

公式选择由R模型公式指定的列。 目前,我们支持R运算符的有限子集,包括'〜','。',':','+'和' - '。 基本操作有:

〜单独的目标和条件
+连字词,“+ 0”表示删除截取
- 删除术语,“ - 1”表示删除拦截
:交互(数字乘法或二值化分类值)
. 所有列除了目标

假设a和b是双列,我们使用以下简单的例子来说明RFormula的效果:

y〜a + b表示模型y〜w0 + w1 * a + w2 * b,其中w0是截距,w1,w2是系数。

y〜a + b + a:b - 1表示模型y〜w1 * a + w2 * b + w3 * a * b其中w1,w2,w3为系数。

RFormula产生一个特征向量列和一个标签的双列或串列。 像R在线性回归中使用公式时,字符串输入列将被单编码,数字列将被转换为双精度。 如果标签列是类型字符串,则它将首先使用StringIndexer转换为double。 如果DataFrame中不存在标签列,则会从公式中指定的响应变量创建输出标签列。

例子

假设我们有一个具有列id,country,hour和clicked的DataFrame:

id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
如果我们使用具有clicked〜country+hour的公式字符串的RFormula,这表示我们想要基于country和hour预测clicked,转换后我们应该得到以下DataFrame:

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])

formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
3.3 ChiSqSelector