MapReduce (hive表SequenceFile的结果做输入)、MultipleOutputs和Reduce端迭代iterable的一些说明

时间:2024-09-14 17:05:14

  很长时间以来一直写hive,嵌套脚本、偶尔写UDF.  最近用Hive的dynamic partition和多路插入做一些事情,很遗憾的结果是非常不稳定,有时能成功,有时失败。(可能是因为hive版本的问题,查了一些资料也没查的太清楚,因为服务器不能随便动,就想用mapreduce的多路输出吧)。

1.首先这个多路插入也是用的hive的表,表的输出是SequenceFile格式。

按说sequencefile格式输入,取决于内部的Key/value格式。

在驱动类里需要添加

Job job=new Job(getConf(),"dsp_data");
  job.setInputFormatClass(SequenceFileInputFormat.class);
  SequenceFileInputFormat.addInputPath(job, input1);
  SequenceFileInputFormat.addInputPath(job, input2);

Mapper函数的输入:

public class * extends Mapper<BytesWritable , Text, TextPair,TextPair>{}

2.MultipleOutPuts使用:

private static Text value = new Text();
 private MultipleOutputs<Text, Text> mos;
 @Override
 protected void setup(Context context)  throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  mos = new MultipleOutputs<Text,Text>(context); 
 }

Iterator<TextPair> iter = values.iterator();
  TextPair middle=iter.next();
  if (! middle.getSecond().equals("0")) return;
//  String[] middle_fields=middle.getFirst().toString().split("\t",-1);
  
  
  while(iter.hasNext()){
   TextPair xx=iter.next(); 
   if (xx.getSecond().toString().equals("0")) continue;
   String[] xx_fields=xx.getFirst().toString().split("\t");
   if(xx_fields.length<3) continue;
   String custom_id=xx_fields[xx_fields.length-1];
   value.set(xx_fields[0]+"\t"+xx_fields[1]+"\t"+middle.getFirst().toString());
   mos.write(key.getFirst(), value, custom_id+"/");    
  }

@Override
 protected void cleanup(Context context)
   throws IOException, InterruptedException {
  super.cleanup(context);
  mos.close();
 }

 3.上面的语句有点问题。

在于middle的使用,因为reduce中iterable values使用的对象都是反序列化出来的,而指定的具体的类都是由一个初始化的对象,不断更新里面的字段实现的。

上面的例子,就造成了middle指向的对象没变,但是实际对象中的内容已经被更新成了新序列化的结果,得不到middle最初赋值地方的值。

解决办法有两个:将middle中,需要的数据部分事先取出来。   另外一个实现TextPair的clone或者实现一个get方法,获得一个新对象来解决。