MapReduce 多表连接

时间:2023-03-08 15:45:32
题目描述:
现在有两个文件,1为存放公司名字和城市ID,2为存放城市ID和城市名
表一:
factoryname,addressed
Beijing Red Star,1
Shenzhen Thunder,3
Guangzhou Honda,2
Beijing Rising,1
Guangzhou Development Bank,2
Tencent,3
Back of Beijing,1
表2:
1,Beijing
2,Guangzhou
3,Shenzhen
4,Xian
现在要求输出公司名和城市名。例如:
Beijing Red Star Beijing
这个类似数据库里的多表连接。整体思路和单表连接差不多。还是利用reduce阶段对城市ID进行归并,我们在map阶段统一输出key=城市ID value=falg+“+”+城市名or公司名。然后通过reduce对flag的解析,分析后者是城市名还是公司名,并放到两个数组中,最后利用笛卡尔积将其输出
具体代码
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
                 public void map(LongWritable ikey, Text ivalue, Context context )
                                                 throws IOException, InterruptedException {
                                String line=ivalue.toString();
                                StringTokenizer st= new StringTokenizer(line,"," );
                                String value0=st.nextToken();
                                String value1=st.nextToken();
                                 if(value0.compareTo("factoryname" )!=0){
                                                 if(value0.length()==1){
                                                                 context.write(new Text(value0), new Text("1" +"+"+value1));
                                                } else{
                                                                 context.write(new Text(value1), new Text("2" +"+"+value0));
                                                }
                                }
                }
}
public class MyReducer extends Reducer<Text, Text, Text, Text> {
                 public void reduce(Text _key, Iterable<Text> values, Context context)
                                                 throws IOException, InterruptedException {
                                 // process values
                                ArrayList<String> address= new ArrayList<String>();
                                ArrayList<String> factory= new ArrayList<String>();
                                 for (Text val : values) {
                                                String line=val.toString();
                                                StringTokenizer st=new StringTokenizer(line,"+" );
                                                 int flag=Integer.parseInt(st.nextToken());
                                                 if(flag==1){
                                                                String addressname=st.nextToken();
                                                                 address.add(addressname);
                                                } else if (flag==2){
                                                                String factoryname=st.nextToken();
                                                                factory.add(factoryname);
                                                }
                                }
                                 if(address.size()!=0&&factory.size()!=0){
                                                 for(int i=0;i<address.size();i++){
                                                                 for(int j=0;j<factory.size();j++){
                                                                                context.write( new Text(address.get(i)),new Text(factory.get(j)));
                                                                }
                                                }
                                }
                }
}