MapReduce多个作业协调处理

时间:2022-12-25 05:11:01

一:背景

当数据来源不同的时候,比如用户表在MYSQL数据库中,而销售表在HDFS中,我们可以启动多个作业来依次处理这些数据源。


二:技术实现

#需求

#用户表user在MYSQL数据库中,数据如下:

1	liaozhongmin
2	lavimer
3	liao*

#销售表user_data在HDFS中,数据如下:

1	12
2	28
2	36
3	88

#我们现在的需求是要统计每个用户的销售情况,结果应该如下显示:

1	liaozhongmin	12
2	lavimer	        64
3	liao*	88


代码实现:

MultiJob1.java从数据库中读取数据并进行处理:

public class MultiJob1 {
		
	public static class Step1Mapper extends Mapper<LongWritable, User, Text, Text>{
		//创建输出的key
		private Text outKey = new Text();
		private Text outValue = new Text();
		protected void map(LongWritable key, User value, Mapper<LongWritable, User, Text, Text>.Context context) throws IOException, InterruptedException {
			//设置key
			outKey.set(String.valueOf(value.getId()));
			//设置写出去的value
			outValue.set(value.getName());
			
			//把结果写出去
			context.write(outKey, outValue);
		}
	}
	
	public static class Step1Reducer extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
			for (Text val : values){
				context.write(key, val);
			}
		}
	}
	
	/**
	 * 运行job的方法
	 * @param path
	 */
	public static void run(Map<String, String> path){
		try {
			//创建配置信息
			Configuration conf = new Configuration();
			
			//通过conf创建数据库配置信息
			DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB","root","134045");
			
			//从map集合中取出输出路径
			String step1OutPath = path.get("step1Output");
			
			//创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(step1OutPath), conf);
			
			//如果输出目录存在就删除
			if (fileSystem.exists(new Path(step1OutPath))){
				fileSystem.delete(new Path(step1OutPath),true);
			}
			
			//创建任务
			Job job = new Job(conf,MultiJob1.class.getName());
			
			//1.1 设置输入数据格式化的类和设置数据来源
			job.setInputFormatClass(DBInputFormat.class);
			DBInputFormat.setInput(job, User.class, "user", null, null, new String[]{"id","name"});
			
			//1.2 设置自定义的Mapper类和Mapper输出的key和value的类型
			job.setMapperClass(Step1Mapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			//1.3 设置分区和reduce数量(reduce的数量和分区的数量对应,因为分区只有一个,所以reduce的个数也设置为一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);
			
			//1.4 排序
			//1.5 归约
			//2.1 Shuffle把数据从Map端拷贝到Reduce端
			//job.setCombinerClass(Step1Reducer.class);
			//2.2 指定Reducer类和输出key和value的类型
			job.setReducerClass(Step1Reducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			//2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(step1OutPath));
			job.setOutputFormatClass(TextOutputFormat.class);
			
			//提交作业 然后关闭虚拟机正常退出
			job.waitForCompletion(true);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

MultiJob2.java从HDFS中读取数据并且和第一个Job处理后的结果进行合并:

public class MultiJob2 {

	// 定义一个输入路径用于判断当前处理的是来自哪里的文件
	private static String FILE_PATH = "";

	public static class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> {

		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {

			// 获取文件的输入路径
			FileSplit fileSplit = (FileSplit) context.getInputSplit();
			FILE_PATH = fileSplit.getPath().toString();

			// 获取输入行记录
			String line = value.toString();
			// 抛弃无效记录(这里最好使用计数器统计一下无效记录)
			if (line == null || line.equals("")) {
				return;
			}
			// 处理来自数据库中的中间结果
			if (FILE_PATH.contains("part")) {
				// 按制表符进行切割
				String[] values = line.split("\t");
				// 当数组长度小于2的时候,视为无效记录
				if (values.length < 2) {
					return;
				}

				// 获取id和name
				String id = values[0];
				String name = values[1];
				// 把结果写出去
				context.write(new Text(id), new Text(name));
			} else if (FILE_PATH.contains("user_data")) {
				// 按制表符进行切割
				String[] values = line.split("\t");
				// 当数组长度小于2的时候,视为无效记录
				if (values.length < 2) {
					return;
				}

				// 获取id和grade
				String id = values[0];
				String score = values[1];
				// 把结果写出去
				context.write(new Text(id), new Text(score));
			}

		}
	}

	public static class Step2Reducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {


			// 用来存放来自数据库的中间结果
			Vector<String> vectorDB = new Vector<String>();
			// 用来存放来自HDFS中处理后的结果
			Vector<String> vectorHDFS = new Vector<String>();
			// 迭代数据键对应的数据添加到相应Vector中
			for (Text val : values) {
				if (val.toString().startsWith("db#")) {
					vectorDB.add(val.toString().substring(3));
				} else if (val.toString().startsWith("hdfs#")) {
					vectorHDFS.add(val.toString().substring(5));
				}
			}

			// 获取两个Vector集合的长度
			int sizeA = vectorDB.size();
			int sizeB = vectorHDFS.size();
			// 做笛卡尔积
			for (int i = 0; i < sizeA; i++) {
				for (int j = 0; j < sizeB; j++) {
					context.write(new Text(key), new Text(vectorDB.get(i) + "\t" + vectorHDFS.get(j)));
				}
			}
		}
	}

	/**
	 * 自定义Combiner
	 * 
	 * @author 廖钟民 time : 2015年1月25日下午1:39:51
	 * @version
	 */
	public static class Step2Combiner extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

			int sum = 0;
			//处理来自数据库中的数据
			if (FILE_PATH.contains("part")) {
				for (Text val : values) {
					context.write(key, new Text("db#" + val.toString()));
				}

			} else {//处理来自HDFS中的数据
				for (Text val : values) {
					sum += Integer.parseInt(val.toString());
				}
				context.write(key, new Text("hdfs#" + String.valueOf(sum)));
			}

		}
	}

	public static void run(Map<String, String> paths) {
		try {
			// 创建配置信息
			Configuration conf = new Configuration();

			// 从Map集合中获取输入输出路径
			String step2Input1 = paths.get("step2Input1");
			String step2Input2 = paths.get("step2Input2");
			String step2Output = paths.get("step2Output");
			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(step2Output), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(step2Output))) {
				fileSystem.delete(new Path(step2Output), true);
			}

			// 创建任务
			Job job = new Job(conf, MultiJob2.class.getName());

			// 1.1 设置输入目录和设置输入数据格式化的类
			FileInputFormat.addInputPath(job, new Path(step2Input1));
			FileInputFormat.addInputPath(job, new Path(step2Input2));
			job.setInputFormatClass(TextInputFormat.class);

			//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(Step2Mapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);

			// 1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);

			// 1.4 排序
			// 1.5 归约
			job.setCombinerClass(Step2Combiner.class);
			
			// 2.1 Shuffle把数据从Map端拷贝到Reduce端。
			// 2.2 指定Reducer类和输出key和value的类型
			job.setReducerClass(Step2Reducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);

			// 2.3 指定输出的路径和设置输出的格式化类
			FileOutputFormat.setOutputPath(job, new Path(step2Output));
			job.setOutputFormatClass(TextOutputFormat.class);

			// 提交作业 退出
			job.waitForCompletion(true);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

MultiJobTest.java作业调度控制类:

public class MultiJobTest {

	//定义HDFS的路径
	public static final String HDFS = "hdfs://liaozhongmin:9000";
	
	
	public static void main(String[] args) {
		//定义一个map集合用于存储操作参数
		Map<String, String> paths = new HashMap<String, String>();
		
		//存储第一步的输出路径(第一步是从数据库中去取数据,没有输入路径)
		paths.put("step1Output", HDFS + "/step1_Out");
		
		//存储第二部的输入路径(第二个参数是多参数输入的)
		paths.put("step2Input1", HDFS + "/step2_inpath/user_data");
		paths.put("step2Input2", HDFS + "/step1_Out/part-*");
		paths.put("step2Output", HDFS + "/step2_out");
		
		//依次运行job
		MultiJob1.run(paths);
		MultiJob2.run(paths);
		
		System.exit(0);
	}
	
	public static JobConf config(){
		//创建配置
		JobConf conf = new JobConf(MultiJobTest.class);
		conf.setJobName("MultiJobTest");
		
		//设置配置文件
		/*conf.addResource("classpath:/hadoop/core-site.xml");
		conf.addResource("classpath:/hadoop/hdfs-site.xml");
		conf.addResource("classpath:/hadoop/mapred-site.xml");*/
		
		conf.set("io.sort.mb", "1024");
		
		return conf;
	}
}

程序运行的结果如下:

MapReduce多个作业协调处理