Mahout源码分析:并行化FP-Growth算法

时间:2023-03-09 20:03:15
Mahout源码分析:并行化FP-Growth算法

  FP-Growth是一种常被用来进行关联分析,挖掘频繁项的算法。与Aprior算法相比,FP-Growth算法采用前缀树的形式来表征数据,减少了扫描事务数据库的次数,通过递归地生成条件FP-tree来挖掘频繁项。参考资料[1]详细分析了这一过程。事实上,面对大数据量时,FP-Growth算法生成的FP-tree非常大,无法放入内存,挖掘到的频繁项也可能有指数多个。本文将分析如何并行化FP-Growth算法以及Mahout中并行化FP-Growth算法的源码。

1. 并行化FP-Growth

  并行化FP-Growth的方法有很多,最早提出使用MapReduce并行化FP-Growth算法的应该是来自Google Beijing Research的Haoyuan Li等(见参考资料[2])。他们提出使用三次MapReduce来并行化FP-Growth,整个过程大致可以分为五个步骤:

Step 1:Sharding

  为了均衡整个集群的读写性能,将事务数据库分成若干个数据片段(shard),存储到P个节点中。

Step 2:Parallel Counting

  与WordCount类似,通过一次MapReduce来计算每一个项(item)的支持度。具体来说,每一个mapper将从hdfs中取得事务数据库的若干个数据片段(shards),所以mapper的输入是<key, value=Ti>,Ti表示数据片段中的一条数据。对于Ti中的每一个项aj,mapper输出<key=aj, value=1>。当集群中的所有mapper处理完数据之后,所有key=aj的键值对将被分配到同一个reducer,所以reducer的输入是<key=aj, value={1, 1, ... , 1}>。reducer只需要进行一次求和,然后输出<key=aj, value=sum{1, 1, ... , 1}>。最终将得到一张按照支持度递减排序的列表,称之为F-List:

Mahout源码分析:并行化FP-Growth算法

图1

Step 3:Grouping Items

  将F-List中的项(item)分为Q个组(group),每一个组都有一个唯一的group-id,我们将所有项以及其所对应的group-id记为G-List。

Step 4:Parallel FP-Growth

  这一步骤是并行化FP-Growth的关键,也是整个算法中相对难以理解的部分。这一步骤中将用到一次MapReduce。每一个mapper的输入来自第一步生成的数据片段,所以mapper的输入是<key, value=Ti>。在处理这些数据片段之前,mapper将读取第三步生成的G-List。G-List其实是一张Hashmap,键是项,值是项所对应的group-id,所占空间一般并不会很大,可以放入内存中。从Ti的最后一项开始向前扫描,或者说从右向左扫描,如果aL在G-List中对应的group-id是第一次被扫描,则输出{a0,a1,…,aL},否则不输出任何数据。以图1中所示的数据为例,假如支持度阈值为1,Q为3,那么将得到G-List:

Mahout源码分析:并行化FP-Growth算法

图2

  其中,第三列是group-id。假如mapper的输入是{牛奶,鸡蛋,面包,薯片},从最后一项开始扫描,输出<key=1,value={牛奶,鸡蛋,面包,薯片}>。之后的两项是面包和鸡蛋,其所对应的group-id和薯片相同,所以不输出任何数据。第一项是牛奶,其所对应的group-id未曾出现过,所以输出<key=2,value={牛奶}>。

  所有group-id相同的数据将被推送到同一个reducer,所以reducer的输入是<key=group-id,value={{ValueList1},{ValueList2},…,{ValueListN}}>。reducer在本地构建FP-tree,然后像传统的FP-Growth算法那样递归地构建条件FP-tree,并挖掘频繁模式。与传统的FP-Growth算法不一样的是,reducer并不直接输出所挖掘到的频繁模式,而是将其放入一个大小为K,根据支持度排序建立的大根堆,然后输出K个支持度较高的频繁模式:<key=item,reduce={包含该item的Top K Frequent Patterns}>。 

Step 5:Aggregating

  上一步挖掘到的频繁模式Top K Frequent Patterns已经包含了所有频繁模式,然而上一步的MapReduce是按照groupID来划分数据,因此key=item对应的频繁模式会存在于若干个不同groupID的reduce节点上。为了合并所有key=item的键值对,优化结果展现形式,可利用MapReduce默认对key排序的特点,对挖掘到的频繁模式进行一下处理:依次将Top K Frequent Patterns的每一个item作为key,然后输出包含该key的这一条Top K Frequent Patterns。所以,每一个mapper的输出是<key=item, value={该节点上的包含该item的频繁模式}>,reducer汇总所有mapper的输出结果,并输出最终的结果<key=item, value={包含该item的所有频繁模式}>。

2. Parallel FP-Growth源码分析

  Mahout提供了一些机器学习领域经典算法的实现。Mahout0.9之后的版本已经移除了Parallel FP-Growth算法。本文将分析Mahout0.8中Parallel FP-Growth的源码。

Mahout源码分析:并行化FP-Growth算法

图3

FPGrowthDriver.java

  FPGrowthDriver是FPGrowth算法的驱动类,继承自AbstractJob类。运行Hadoop任务一般都是通过命令行中执行bin/hadoop脚本,同时传入一些参数。ToolRunner类中的GenericOptionsParser可获取这些命令行参数。AbstractJob类封装了addInputOption,addOutputOption,addOption,parseArguments等方法,为解析命令行参数提供了帮助。params对象存储了整个算法所需要的参数。FPGrowthDriver根据命令行参数,若顺序执行,则调用该文件内的runFPGrowth方法,若并行化执行,则调用PFPGrowth.java文件中的runPFPGrowth方法。

 public final class FPGrowthDriver extends AbstractJob {

   private static final Logger log = LoggerFactory.getLogger(FPGrowthDriver.class);

   private FPGrowthDriver() {
} public static void main(String[] args) throws Exception {
//ToolRunner的静态方法run()内有GenericOptionsParser。通过GenericOptionsParser.getRemainingArgs()可获取传入的命令行参数。之后,ToolRunner.run()将调用FPGrowthDriver.run()。
ToolRunner.run(new Configuration(), new FPGrowthDriver(), args);
} /**
* Run TopK FPGrowth given the input file,
*/
@Override
public int run(String[] args) throws Exception {
addInputOption(); //添加默认的输入目录路径
addOutputOption(); //添加默认的输出目录路径 addOption("minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present."
+ " Default Value: 3", "3"); //添加支持度阈值
addOption("maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items."
+ " Default value: 50", "50"); //添加大根堆的大小
addOption("numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version."
+ " Doesn't work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT,
Integer.toString(PFPGrowth.NUM_GROUPS_DEFAULT)); //添加组数g
addOption("splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into"
+ " itemsets. Default value splits comma separated itemsets. Default Value:"
+ " \"[ ,\\t]*[,|\\t][ ,\\t]*\" ", "[ ,\t]*[,|\t][ ,\t]*"); //添加分隔符
addOption("numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate"
+ " tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory, "
+ "so keep this value small, but big enough to prevent duplicate tree building. "
+ "Default Value:5 Recommended Values: [5-10]", "5");
addOption("method", "method", "Method of processing: sequential|mapreduce", "sequential"); //添加训练方法,顺序执行或并行执行
addOption("encoding", "e", "(Optional) The file encoding. Default value: UTF-8", "UTF-8"); //添加编码方式
addFlag("useFPG2", "2", "Use an alternate FPG implementation"); //如果解析命令行参数失败,则退出
if (parseArguments(args) == null) {
return -1;
} Parameters params = new Parameters(); if (hasOption("minSupport")) {
String minSupportString = getOption("minSupport");
params.set("minSupport", minSupportString);
}
if (hasOption("maxHeapSize")) {
String maxHeapSizeString = getOption("maxHeapSize");
params.set("maxHeapSize", maxHeapSizeString);
}
if (hasOption("numGroups")) {
String numGroupsString = getOption("numGroups");
params.set("numGroups", numGroupsString);
} if (hasOption("numTreeCacheEntries")) {
String numTreeCacheString = getOption("numTreeCacheEntries");
params.set("treeCacheSize", numTreeCacheString);
} if (hasOption("splitterPattern")) {
String patternString = getOption("splitterPattern");
params.set("splitPattern", patternString);
} String encoding = "UTF-8";
if (hasOption("encoding")) {
encoding = getOption("encoding");
}
params.set("encoding", encoding); if (hasOption("useFPG2")) {
params.set(PFPGrowth.USE_FPG2, "true");
} Path inputDir = getInputPath();
Path outputDir = getOutputPath(); params.set("input", inputDir.toString());
params.set("output", outputDir.toString()); String classificationMethod = getOption("method");
if ("sequential".equalsIgnoreCase(classificationMethod)) {
runFPGrowth(params);
} else if ("mapreduce".equalsIgnoreCase(classificationMethod)) {
Configuration conf = new Configuration();
HadoopUtil.delete(conf, outputDir);
PFPGrowth.runPFPGrowth(params);
} return 0;
}

PFPGrowth.java

  PFPGrowth是并行化FP-Growth算法的驱动类。runPFPGrowth(params)方法内初始化了一个Configuration对象,之后调用runPFPGrowth(params, conf)方法。runPFPGrowth(params, conf)方法包括了并行化FP-Growth算法的五个关键步骤。其中,startParallelCounting(params, conf)对应Step1和Step2,通过类似WordCount的方法统计每一项的支持度,其输出结果将被readFList()和saveList()用于生成FList。之后,将按照用户输入的命令行参数NUM_GROUPS来计算每一个group所含项的个数,并将其存储到params。startParallelFPGrowth(params, conf)对应Step3和Step4。startAggregating(params, conf)对应Step5。

 public static void runPFPGrowth(Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException {
conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); startParallelCounting(params, conf); //对应Step1和Step2 // save feature list to dcache
List<Pair<String,Long>> fList = readFList(params);
saveFList(fList, params, conf); // set param to control group size in MR jobs
int numGroups = params.getInt(NUM_GROUPS, NUM_GROUPS_DEFAULT);
int maxPerGroup = fList.size() / numGroups;
if (fList.size() % numGroups != 0) {
maxPerGroup++;
}
params.set(MAX_PER_GROUP, Integer.toString(maxPerGroup)); startParallelFPGrowth(params, conf); //对应Step3和Step4 startAggregating(params, conf); //对应Step5
}

  

  startParallelCounting方法初始化了一个Job对象。该Job对象将调用ParallelCountingMapper和ParallelCountingReducer来完成支持度的统计。

  /**
* Count the frequencies of various features in parallel using Map/Reduce
*/
public static void startParallelCounting(Parameters params, Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
conf.set(PFP_PARAMETERS, params.toString()); conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK"); String input = params.get(INPUT);
Job job = new Job(conf, "Parallel Counting Driver running over input: " + input);
job.setJarByClass(PFPGrowth.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(input));
Path outPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING);
FileOutputFormat.setOutputPath(job, outPath); HadoopUtil.delete(conf, outPath); job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ParallelCountingMapper.class);
job.setCombinerClass(ParallelCountingReducer.class);
job.setReducerClass(ParallelCountingReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job failed!");
} }

  

ParallelCountingMapper.java

  ParallelCountingMapper中map方法的输入分别是字节偏移量offset和事务数据库中的某一行数据input。所有input数据中多次出现的项都被视为出现一次,所以将input数据split之后存储到HashSet中。map方法的输出是<key=item, value=one>。

 public class ParallelCountingMapper extends Mapper<LongWritable,Text,Text,LongWritable> {

     private static final LongWritable ONE = new LongWritable(1);

     private Pattern splitter;

     @Override
protected void map(LongWritable offset, Text input, Context context) throws IOException, InterruptedException { String[] items = splitter.split(input.toString());
Set<String> uniqueItems = Sets.newHashSet(Arrays.asList(items));
for (String item : uniqueItems) {
if (item.trim().isEmpty()) {
continue;
}
context.setStatus("Parallel Counting Mapper: " + item);
context.write(new Text(item), ONE);
}
} @Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString()));
}
}

ParallelCountingReducer.java 

  ParallelCountingReducer中reduce方法的输入是<key=item, value={one, one, ... , one}>。所有key=item的键值对将被分配到一台机器上,所以只需要对values进行遍历求和就可以求出该item的支持度。

 public class ParallelCountingReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

     @Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
InterruptedException {
long sum = 0;
for (LongWritable value : values) {
context.setStatus("Parallel Counting Reducer :" + key);
sum += value.get();
}
context.setStatus("Parallel Counting Reducer: " + key + " => " + sum);
context.write(key, new LongWritable(sum)); }
}

PFPGrowth.java

  通过params中的OUTPUT参数可以获取ParallelCountingReducer的输出路径。在readFList这个方法中用到了几个数据结构。Pair实现了Comparable接口和Serializable接口,其数据成员first和second分别用来表示item和item所对应的支持度。PriorityQueue是一个用平衡二叉树实现的小顶堆,如果指定了Comparator,将按照Comparator对PriorityQueue中的元素进行排序,如果未指定Comparator,则将按照元素实现的Comparable接口进行排序。在并行化FP-Growth算法中,初始化PriorityQueue时指定了Comparator,其按照Pair的第一个元素进行排序,如果第一个元素相等,则按照第二个元素进行排序。通过初始化SequenceFileDirIterable来遍历上一次MapReduce输出的结果,每次将Pair添加到PriorityQueue的同时完成排序。最后,逐一将PriorityQueue中的元素取出放入fList。因此,fList是一个按照支持度递减的列表。

 /**
* read the feature frequency List which is built at the end of the Parallel counting job
*
* @return Feature Frequency List
*/
public static List<Pair<String,Long>> readFList(Parameters params) {
int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3"));
Configuration conf = new Configuration(); Path parallelCountingPath = new Path(params.get(OUTPUT), PARALLEL_COUNTING); PriorityQueue<Pair<String,Long>> queue = new PriorityQueue<Pair<String,Long>>(11,
new Comparator<Pair<String,Long>>() {
@Override
public int compare(Pair<String,Long> o1, Pair<String,Long> o2) {
int ret = o2.getSecond().compareTo(o1.getSecond());
if (ret != 0) {
return ret;
}
return o1.getFirst().compareTo(o2.getFirst());
}
}); for (Pair<Text,LongWritable> record
: new SequenceFileDirIterable<Text,LongWritable>(new Path(parallelCountingPath, FILE_PATTERN),
PathType.GLOB, null, null, true, conf)) {
long value = record.getSecond().get();
if (value >= minSupport) {
queue.add(new Pair<String,Long>(record.getFirst().toString(), value));
}
}
List<Pair<String,Long>> fList = Lists.newArrayList();
while (!queue.isEmpty()) {
fList.add(queue.poll());
}
return fList;
}

  由于已经生成了fList,上一次MapReduce的输出结果已经没有用了,因此,saveFList方法首先删除了这些文件。之后,saveFList方法将flist写入到hdfs上。对于存储在hdfs上的文件,DistributedCache提供了缓存文件的功能,在Slave Node进行计算之前可将hdfs上的文件复制到这些节点上。

 /**
* Serializes the fList and returns the string representation of the List
*/
public static void saveFList(Iterable<Pair<String,Long>> flist, Parameters params, Configuration conf)
throws IOException {
Path flistPath = new Path(params.get(OUTPUT), F_LIST);
FileSystem fs = FileSystem.get(flistPath.toUri(), conf);
flistPath = fs.makeQualified(flistPath);
HadoopUtil.delete(conf, flistPath);
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath, Text.class, LongWritable.class);
try {
  for (Pair<String,Long> pair : flist) {
writer.append(new Text(pair.getFirst()), new LongWritable(pair.getSecond()));
}
} finally {
writer.close();
}
DistributedCache.addCacheFile(flistPath.toUri(), conf);
}

  startParallelFPGrowth方法初始化了一个Job对象。该Job对象将调用ParallelFPGrowthMapper和ParallelFPGrowthReducer来实现Step3和Step4。

 /**
* Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features of group dependent shards
*/
public static void startParallelFPGrowth(Parameters params, Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
Path input = new Path(params.get(INPUT));
Job job = new Job(conf, "PFP Growth Driver running over input" + input);
job.setJarByClass(PFPGrowth.class); job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(TransactionTree.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TopKStringPatterns.class); FileInputFormat.addInputPath(job, input);
Path outPath = new Path(params.get(OUTPUT), FPGROWTH);
FileOutputFormat.setOutputPath(job, outPath); HadoopUtil.delete(conf, outPath); job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ParallelFPGrowthMapper.class);
job.setCombinerClass(ParallelFPGrowthCombiner.class);
job.setReducerClass(ParallelFPGrowthReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
 throw new IllegalStateException("Job failed!");
}
}

ParallelFPGrowthMapper.java

  ParallelFPGrowthMapper中的setup方法将在map方法之前被运行。setup方法中调用了readFList方法。注意这里的readFList方法与之前分析的readFList方法参数不一样,所以是两个完全不同的方法。这里的readFList方法通过HadoopUtil.getCachedFiles(conf)来获取缓存文件flist,将其存储到fMap,其中item作为fMap的键,item在flist中的位置序号作为fMap的值,例如flist中的第一个item,其在fMap中将是<key=item, value=0>。这样做的原因是之后将fMap分Q个group时需要用到这个位置序号。在map方法中,输入是字节偏移量和事务数据库中的某一行数据。根据用户指定的分隔符splitter来切分数据。为了过滤非频繁项,通过fMap.containsKey(item)方法来查找该项是否存在于fList中。若存在,将其所对应的位置序号加入到itemSet,否则,将其丢弃。itemArr复制itemSet中的数据,并按照位置序号递增进行排序,即按照支持度递减进行排序。之后的for循环从itemArr的最后一个元素向前遍历,如果其所对应的groupID不在groups中,那么将初始化TransactionTree,将itemArr[0],itemArr[1],…,itemArr[j]存入该TransactionTree中。groupID的计算非常简单,将位置序号除以maxPerGroup即可。TransactionTree实现了Writable和Iterable<Pair<IntArrayList, Long>>接口,初始化TransactionTree时,构造方法将参数赋值给TransactionTree中的数据成员List<Pair<IntArrayList, Long>> transactionSet。这里Pair对象存储的两个元素分别是位置序号列表和1。

 /**
* maps each transaction to all unique items groups in the transaction. mapper
* outputs the group id as key and the transaction as value
*
*/
public class ParallelFPGrowthMapper extends Mapper<LongWritable,Text,IntWritable,TransactionTree> { private final OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>();
private Pattern splitter;
private int maxPerGroup;
private final IntWritable wGroupID = new IntWritable(); @Override
protected void map(LongWritable offset, Text input, Context context)
throws IOException, InterruptedException { String[] items = splitter.split(input.toString()); OpenIntHashSet itemSet = new OpenIntHashSet(); for (String item : items) {
if (fMap.containsKey(item) && !item.trim().isEmpty()) {
itemSet.add(fMap.get(item));
}
} IntArrayList itemArr = new IntArrayList(itemSet.size());
itemSet.keys(itemArr);
itemArr.sort(); OpenIntHashSet groups = new OpenIntHashSet();
for (int j = itemArr.size() - 1; j >= 0; j--) {
// generate group dependent shards
int item = itemArr.get(j);
int groupID = PFPGrowth.getGroup(item, maxPerGroup); if (!groups.contains(groupID)) {
IntArrayList tempItems = new IntArrayList(j + 1);
tempItems.addAllOfFromTo(itemArr, 0, j);
context.setStatus("Parallel FPGrowth: Generating Group Dependent transactions for: " + item);
wGroupID.set(groupID);
context.write(wGroupID, new TransactionTree(tempItems, 1L));
}
groups.add(groupID);
} } @Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context); int i = 0;
for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
} Parameters params =
new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); splitter = Pattern.compile(params.get(PFPGrowth.SPLIT_PATTERN,
PFPGrowth.SPLITTER.toString())); maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0);
}
}

ParallelFPGrowthReducer.java

  ParallelFPGrowthReducer的输入是<key=groupID, value={TransactionTree1, TransactionTree2, … , TransactionTreeN}>。setup方法获取了参数params,并且通过PFPGrowth.readFList(conf)方法获取了缓存文件flist,将频繁项存入featureReverseMap,将频繁项对应的支持度存入freqList。之前分析到ParallelFPGrowthMapper输出的TransactionTree其实是List<Pair<IntArrayList, Long>> transactionSet。在ParallelFPGrowthReducer内初始化了一个TransactionTree,虽然这个TransactionTree与之前的Transaction是同一个类,但是是一棵用二维数组实现的树。考虑到文章篇幅,建树的过程这里不作分析。假设已经建好了这棵树,cTree.generateFList方法遍历这棵树,返回Map<Integer, MutableLong> frequencyList。具体的遍历方法这里不作详细分析,提一下其调用过程:TransactionTree实现Iterator<Pair<IntArrayList, Long>>接口时重写了iterator方法,在generateFList方法中通过iterator方法生成一个迭代器来遍历整棵树。iterator方法返回的是TransactionTreeIterator对象。TransactionTreeIterator对象继承自AbstractIterator<Pair<IntArrayList, Long>>,实现了对TransactionTree进行遍历。localFList合并了generateFList的结果并按照支持度递减进行排序。生成频繁模式的方法有两种,用户可以自己选择来调用FPGrowthIds.generateTopKFrequentPatterns方法或者fpGrowth.generateTopKFrequentPatterns方法来生成频繁模式,本文将对后者进行分析。在ParallelFPGrowthReducer中还有一个IteratorAdapter类。它是设计模式中十分经典的适配器模式的具体应用,可以将两个不同类型的迭代器解耦。ParallelFPGrowthReducer的输出是<key=item, value={Top K Frequent Patterns}>。

 /**
* takes each group of transactions and runs Vanilla FPGrowth on it and
* outputs the the Top K frequent Patterns for each group.
*
*/
   public final class ParallelFPGrowthReducer extends Reducer<IntWritable,TransactionTree,Text,TopKStringPatterns> { private final List<String> featureReverseMap = Lists.newArrayList();
private final LongArrayList freqList = new LongArrayList();
private int maxHeapSize = 50;
private int minSupport = 3;
private int numFeatures;
private int maxPerGroup;
private boolean useFP2; private static final class IteratorAdapter implements Iterator<Pair<List<Integer>,Long>> {
private final Iterator<Pair<IntArrayList,Long>> innerIter; private IteratorAdapter(Iterator<Pair<IntArrayList,Long>> transactionIter) {
innerIter = transactionIter;
} @Override
public boolean hasNext() {
return innerIter.hasNext();
} @Override
public Pair<List<Integer>,Long> next() {
Pair<IntArrayList,Long> innerNext = innerIter.next();
return new Pair<List<Integer>,Long>(innerNext.getFirst().toList(), innerNext.getSecond());
} @Override
public void remove() {
throw new UnsupportedOperationException();
}
} @Override
protected void reduce(IntWritable key, Iterable<TransactionTree> values, Context context) throws IOException {
TransactionTree cTree = new TransactionTree();
for (TransactionTree tr : values) {
for (Pair<IntArrayList,Long> p : tr) {
cTree.addPattern(p.getFirst(), p.getSecond());
}
} List<Pair<Integer,Long>> localFList = Lists.newArrayList();
for (Entry<Integer,MutableLong> fItem : cTree.generateFList().entrySet()) {
localFList.add(new Pair<Integer,Long>(fItem.getKey(), fItem.getValue().toLong()));
} Collections.sort(localFList, new CountDescendingPairComparator<Integer,Long>()); if (useFP2) {
FPGrowthIds.generateTopKFrequentPatterns(
cTree.iterator(),
freqList,
minSupport,
maxHeapSize,
PFPGrowth.getGroupMembers(key.get(), maxPerGroup, numFeatures),
new IntegerStringOutputConverter(
new ContextWriteOutputCollector<IntWritable, TransactionTree, Text, TopKStringPatterns>(context),
featureReverseMap),
new ContextStatusUpdater<IntWritable, TransactionTree, Text, TopKStringPatterns>(context));
} else {
FPGrowth<Integer> fpGrowth = new FPGrowth<Integer>();
fpGrowth.generateTopKFrequentPatterns(
new IteratorAdapter(cTree.iterator()),
localFList,
minSupport,
maxHeapSize,
Sets.newHashSet(PFPGrowth.getGroupMembers(key.get(),
maxPerGroup,
numFeatures).toList()),
new IntegerStringOutputConverter(
new ContextWriteOutputCollector<IntWritable,TransactionTree,Text,TopKStringPatterns>(context),
featureReverseMap),
new ContextStatusUpdater<IntWritable,TransactionTree,Text,TopKStringPatterns>(context));
}
} @Override
protected void setup(Context context) throws IOException, InterruptedException { super.setup(context);
Parameters params = new Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, "")); for (Pair<String,Long> e : PFPGrowth.readFList(context.getConfiguration())) {
featureReverseMap.add(e.getFirst());
freqList.add(e.getSecond());
} maxHeapSize = Integer.valueOf(params.get(PFPGrowth.MAX_HEAPSIZE, "50"));
minSupport = Integer.valueOf(params.get(PFPGrowth.MIN_SUPPORT, "3")); maxPerGroup = params.getInt(PFPGrowth.MAX_PER_GROUP, 0);
numFeatures = featureReverseMap.size();
useFP2 = "true".equals(params.get(PFPGrowth.USE_FPG2));
}

TransactionTree.java

  在分析fpGrowth.generateTopKFrequentPatterns方法之前,先来分析一下建树过程中使用的addPattern方法。下面的代码列出了TransactionTree的数据成员和addPattern方法。在addPattern方法中,首先从根节点开始与myList中的节点进行比较。childWithAttribute返回temp节点下的孩子节点中是否有和attributeValue名称相同的节点。如果没有,addCountMode置为false,将myList中剩余的节点添加到这棵树中;如果有,则通过addCount方法增加child节点的支持度。这一建树的思路与传统的FP-Growth中建树的思路完全一致。

 private int[] attribute; //节点的名称属性
private int[] childCount; //对该节点的有多少个孩子节点进行计数
private int[][] nodeChildren; //二维数组,记录每一个节点的孩子节点
private long[] nodeCount; //当前节点的支持度计数
private int nodes;
private boolean representedAsList; //true表示以List形式展现,false表示以树的形式展现
private List<Pair<IntArrayList,Long>> transactionSet; public int addPattern(IntArrayList myList, long addCount) {
int temp = ROOTNODEID;
int ret = 0;
boolean addCountMode = true;
for (int idx = 0; idx < myList.size(); idx++) {
int attributeValue = myList.get(idx);
int child;
if (addCountMode) {
child = childWithAttribute(temp, attributeValue);
if (child == -1) {
addCountMode = false;
} else {
addCount(child, addCount);
temp = child;
}
}
if (!addCountMode) {
child = createNode(temp, attributeValue, addCount);
temp = child;
ret++;
}
}
return ret;
}

FPGrowth.java

  generateTopKFrequentPatterns方法的形参有transactionStream,frequencyList,minSupport,k,Collection<A> returnableFeatures,OutputCollector<A, List<Pair<List<A>, Long>>> output,Statusupdater updater。其中,transactionStream是根据当前key=groupID所对应的Pair<List<A>, Long>类型的values建立的cTree,这里Pair的第一项是位置序号,第二项是1;frequencyList是ParallelFPGrowthReducer中的localFList,其第一项是位置序号,第二项是支持度;Collection<A> returnableFeatures是当前key=group-id所包含的位置序号集合。

  attributeIdMapping过滤了transactionStream中的非频繁项,并为频繁项分配新id,将其映射成<key=位置序号, value=id>。reverseMapping倒置了attributeIdMapping的映射关系。attributeFrequentcy记录了索引为id的项的支持度。对于returnableFeatures中的位置序号进行遍历,过滤非频繁项,returnFeatures记录了剩余的频繁项。之后调用generateTopKFrequentPatterns方法来构建本地的FP-tree和头表(Header-Table),并遍历FP-tree来输出频繁项。参考资料[1]详细分析了这一过程,这里不作进一步分析,需要注意到是在Mahout中FP-tree是以数组的形式存储。

 /**
* Generate Top K Frequent Patterns for every feature in returnableFeatures
* given a stream of transactions and the minimum support
*
* @param transactionStream
* Iterator of transaction
* @param frequencyList
* list of frequent features and their support value
* @param minSupport
* minimum support of the transactions
* @param k
* Number of top frequent patterns to keep
* @param returnableFeatures
* set of features for which the frequent patterns are mined. If the
* set is empty or null, then top K patterns for every frequent item (an item
* whose support> minSupport) is generated
* @param output
* The output collector to which the the generated patterns are
* written
* @throws IOException
*/
public final void generateTopKFrequentPatterns(Iterator<Pair<List<A>,Long>> transactionStream,
Collection<Pair<A, Long>> frequencyList,
long minSupport,
int k,
Collection<A> returnableFeatures,
OutputCollector<A,List<Pair<List<A>,Long>>> output,
StatusUpdater updater) throws IOException { Map<Integer,A> reverseMapping = Maps.newHashMap();
Map<A,Integer> attributeIdMapping = Maps.newHashMap(); int id = 0;
for (Pair<A,Long> feature : frequencyList) {
A attrib = feature.getFirst();
Long frequency = feature.getSecond();
if (frequency >= minSupport) {
attributeIdMapping.put(attrib, id);
reverseMapping.put(id++, attrib);
}
} long[] attributeFrequency = new long[attributeIdMapping.size()];
for (Pair<A,Long> feature : frequencyList) {
A attrib = feature.getFirst();
Long frequency = feature.getSecond();
if (frequency < minSupport) {
break;
}
attributeFrequency[attributeIdMapping.get(attrib)] = frequency;
} log.info("Number of unique items {}", frequencyList.size()); Collection<Integer> returnFeatures = Sets.newHashSet();
if (returnableFeatures != null && !returnableFeatures.isEmpty()) {
for (A attrib : returnableFeatures) {
if (attributeIdMapping.containsKey(attrib)) {
returnFeatures.add(attributeIdMapping.get(attrib));
log.info("Adding Pattern {}=>{}", attrib, attributeIdMapping
.get(attrib));
}
}
} else {
for (int j = 0; j < attributeIdMapping.size(); j++) {
returnFeatures.add(j);
}
} log.info("Number of unique pruned items {}", attributeIdMapping.size());
generateTopKFrequentPatterns(new TransactionIterator<A>(transactionStream,
attributeIdMapping), attributeFrequency, minSupport, k, reverseMapping
.size(), returnFeatures, new TopKPatternsOutputConverter<A>(output,
reverseMapping), updater); }

  AggregatorMapper的输入是<key, value=TopKStringPatterns>,TopKStringPatterns是一个存储<Pair<List<String>,Long>>类型的列表,List<String>类型元素记录了每一个key=item对应的频繁模式,Long类型元素记录了支持度。

 /**
*
* outputs the pattern for each item in the pattern, so that reducer can group them
* and select the top K frequent patterns
*
*/
public class AggregatorMapper extends Mapper<Text,TopKStringPatterns,Text,TopKStringPatterns> { @Override
protected void map(Text key, TopKStringPatterns values, Context context) throws IOException,
InterruptedException {
for (Pair<List<String>,Long> pattern : values.getPatterns()) {
for (String item : pattern.getFirst()) {
List<Pair<List<String>,Long>> patternSingularList = Lists.newArrayList();
patternSingularList.add(pattern);
context.setStatus("Aggregator Mapper:Grouping Patterns for " + item);
context.write(new Text(item), new TopKStringPatterns(patternSingularList));
}
} }
}

  

  AggregatorReducer汇总了所有Key相同的item,然后按照支持度递减排序,最终输出Top K个频繁模式。

 /**
*
* groups all Frequent Patterns containing an item and outputs the top K patterns
* containing that particular item
*
*/
public class AggregatorReducer extends Reducer<Text,TopKStringPatterns,Text,TopKStringPatterns> { private int maxHeapSize = 50; @Override
protected void reduce(Text key, Iterable<TopKStringPatterns> values, Context context) throws IOException,
InterruptedException {
TopKStringPatterns patterns = new TopKStringPatterns();
for (TopKStringPatterns value : values) {
context.setStatus("Aggregator Reducer: Selecting TopK patterns for: " + key);
patterns = patterns.merge(value, maxHeapSize);
}
context.write(key, patterns); } @Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Parameters params = new Parameters(context.getConfiguration().get("pfp.parameters", ""));
maxHeapSize = Integer.valueOf(params.get("maxHeapSize", "50")); }
}

3. 讨论 

  并行化FP-Growth算法解决了大数据量时传统FP-Growth的性能瓶颈。除了并行化FP-Growth算法外,还有许多方法可以优化FP-Growth算法,比如并行化FP-Growth算法时考虑负载均衡,采用极大频繁项集和闭频繁项集表示频繁模式。

  • 极大频繁项集

  极大频繁项集是这样的频繁项集,它的直接超集都不是频繁的。极大频繁项集形成了可以导出所有频繁项集的最小项集集合,但是极大频繁项集却不包含它们子集的支持度信息。

  • 闭频繁项集

  如果项集的直接超集都不具有和它相同的支持度并且该项集的支持度大于或等于最小支持度阈值,则该项集是闭频繁项集。闭频繁项集提供了频繁项集的一种最小表示,该表示不丢失支持度信息。

4. 参考资料

[1] 关联分析:FP-Growth算法. Mark Lin. datahunter. 2014. [Link]

[2] PFP: Parallel FP-Growth for Query Recommendation. Haoyuan Li etc. RecSys '08 Proceedings of the 2008 ACM conference on Recommender systems. 2008. [PDF]