MapReduce编程实例6

时间:2025-04-08 17:05:31

前提准备:

1.hadoop安装运行正常。Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装

2.集成开发环境正常。集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境

MapReduce编程实例:

MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析

MapReduce编程实例(二),计算学生平均成绩

MapReduce编程实例(三),数据去重

MapReduce编程实例(四),排序

MapReduce编程实例(五),MapReduce实现单表关联

多表关联
描述:
两张表关联,如下:
左表:
factoryname address
BMW Factory 2
Benz Factory 3
Voivo Factory 4
LG Factory 5
右表:
addressID addressname
2 Beijing
3 Guangzhou
4 Shenzhen
5 Sanya
根据addressID关联求出factoryname-address表。很明显,左右关联即可,和单表关联一样。不多作表述,有需要可以查看单表关联的分析。
  1. package com.t.hadoop;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15. /**
  16. * 多表排序
  17. * @author daT dev.tao@gmail.com
  18. *
  19. */
  20. public class MTJoin {
  21. public static int times = 1;
  22. public static class MTMapper extends Mapper<Object, Text, Text, Text>{
  23. @Override
  24. protected void map(Object key, Text value, Context context)
  25. throws IOException, InterruptedException {
  26. String relation = new String();
  27. String line = value.toString();
  28. if(line.contains("factoryname")||line.contains("addressID")) return;
  29. int i = 0;
  30. while(line.charAt(i)<'0'||line.charAt(i)>'9'){
  31. i++;
  32. }
  33. if(i>0){//左表
  34. relation = "1";
  35. context.write(new Text(String.valueOf(line.charAt(i))),new Text(relation + line.substring(0,i-1)));
  36. }else{//右表
  37. relation = "2";
  38. context.write(new Text(String.valueOf(line.charAt(i))),new Text(relation +line.substring(i+1)));
  39. }
  40. }
  41. }
  42. public static class MTReducer extends Reducer<Text, Text, Text, Text>{
  43. @Override
  44. protected void reduce(Text key, Iterable<Text> value,Context context)
  45. throws IOException, InterruptedException {
  46. if(times==1){
  47. context.write(new Text("factoryName"), new Text("Address"));
  48. times ++;
  49. }
  50. int factoryNum = 0;
  51. int addressNum = 0;
  52. String[] factorys = new String[10];
  53. String[] addresses = new String[10];
  54. for(Text t:value){
  55. if(t.charAt(0)=='1'){//左表
  56. factorys[factoryNum]=t.toString().substring(1);
  57. factoryNum++;
  58. }else{//右表
  59. addresses[addressNum]=t.toString().substring(1);
  60. addressNum++;
  61. }
  62. }
  63. for(int i = 0;i<factoryNum;i++){
  64. for(int j=0;j<addressNum;j++){
  65. context.write(new Text(factorys[i]), new Text(addresses[j]));
  66. }
  67. }
  68. }
  69. }
  70. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
  71. Configuration conf = new Configuration();
  72. String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
  73. if(otherArgs.length<2){
  74. System.out.println("Parameters error");
  75. System.exit(2);
  76. }
  77. Job job =new Job(conf,"MTjoin");
  78. job.setJarByClass(MTJoin.class);
  79. job.setMapperClass(MTMapper.class);
  80. job.setReducerClass(MTReducer.class);
  81. job.setOutputKeyClass(Text.class);
  82. job.setOutputValueClass(Text.class);
  83. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  84. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  85. System.exit(job.waitForCompletion(true)?0:1);
  86. }
  87. }
输出结果:
factoryName Address
BMW Factory Beijing
Benz Factory Guangzhou
Voivo Factory Shenzhen
LG Factory Sanya
欢迎同学们多多交流~