一:自定义实现InputFormat
*数据源来自于内存
*1.InputFormat是用于处理各种数据源的,下面是实现InputFormat,数据源是来自于内存.
*1.1 在程序的job.setInputFormatClass(MyselfmemoryInputFormat.class);
*1.2 实现InputFormat,extends InputFormat< , >,实现其中的两个方法,分别是getSplits(..),createRecordReader(..).
*1.3 getSplits(..)返回的是一个java.util.List<T>,List中的每个元素是InputSplit.每个InputSplit对应一个mappper任务.
*1.4 InputSplit是对原始海量数据源的划分,因为我们处理的是海量数据,不划分不行.InputSplit数据的大小完全是我们自己来定的.本例中是在内存中产生数据,然后封装到InputSplit.
*1.5 InputSplit封装的是hadoop数据类型,实现Writable接口.
*1.6 RecordReader读取每个InputSplit中的数据.解析成一个个<k,v>,供map处理.
*1.7 RecordReader有4个核心方法,分别是initalize(..).nextKeyValue(),getCurrentKey(),getCurrentValue().
*1.8 initalize重要性在于是拿到InputSplit和定义临时变量.
*1.9 nexKeyValue(..)该方法的每次调用,可以获得key和value值.
*1.10 当nextKeyValue(..)调用后,紧接着调用getCurrentKey(),getCurrentValue().
* mapper方法中的run方法调用.
public class MyselInputFormatApp {
private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
public static void main(String[] args) {
Configuration conf = new Configuration();// 配置对象
try {
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);
Job job = new Job(conf, WordCountApp.class.getSimpleName());// jobName:作业名称
job.setJarByClass(WordCountApp.class); job.setInputFormatClass(MyselfMemoryInputFormat.class);
job.setMapperClass(MyMapper.class);// 指定自定义map类
job.setMapOutputKeyClass(Text.class);// 指定map输出key的类型
job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
job.setOutputKeyClass(Text.class);// 设置Reduce输出key的类型
job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
job.waitForCompletion(true);// 提交给jobTracker并等待结束
} catch (Exception e) {
e.printStackTrace();
}
} public static class MyMapper extends
Mapper<NullWritable, Text, Text, LongWritable> {
@Override
protected void map(NullWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
}
}
} public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable times : values) {
count += times.get();
}
context.write(key, new LongWritable(count));
}
} /**
* 从内存中产生数据,然后解析成一个个的键值对
*
*/
public static class MyselfMemoryInputFormat extends InputFormat<NullWritable,Text>{ @Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
result.add(new MemoryInputSplit());
result.add(new MemoryInputSplit());
result.add(new MemoryInputSplit());
return result;
} @Override
public RecordReader<NullWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new MemoryRecordReader();
}
} public static class MemoryInputSplit extends InputSplit implements Writable{
int SIZE = 10;
//java中的数组在hadoop中不被支持,所以这里使用hadoop的数组
//在hadoop中使用的是这种数据结构,不能使用java中的数组表示.
ArrayWritable arrayWritable = new ArrayWritable(Text.class);
/**
* 先创建一个java数组类型,然后转化为hadoop的数据类型.
* @throws FileNotFoundException
*/
public MemoryInputSplit() throws FileNotFoundException {
//一个inputSplit供一个map使用,map函数如果要被调用多次的话,意味着InputSplit必须解析出多个键值对
Text[] array = new Text[SIZE];
Random random = new Random();
for(int i=0;i<SIZE;i++){
int nextInt = random.nextInt(999999);
Text text = new Text("Text"+nextInt);
array[i] = text ;
} // FileInputStream fs = new FileInputStream(new File("\\etc\\profile"));//从文件中读取
// 将流中的数据解析出来放到数据结构中.
arrayWritable.set(array);
}
@Override
public long getLength() throws IOException, InterruptedException {
return SIZE;
}
@Override
public String[] getLocations() throws IOException,
InterruptedException {
return new String[]{};
}
public ArrayWritable getValues() {
return arrayWritable;
}
@Override
public void write(DataOutput out) throws IOException {
arrayWritable.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
arrayWritable.readFields(in);
}
} public static class MemoryRecordReader extends RecordReader<NullWritable, Text>{
private Writable[] values = null ;
private Text value = null ;
private int i = 0;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
MemoryInputSplit inputSplit = (MemoryInputSplit)split;
ArrayWritable writables = inputSplit.getValues();
this.values = writables.get();
this.i = 0 ;
} @Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
if(i >= values.length){
return false ;
}
if(null == this.value){
value = new Text();
}
value.set((Text)values[i]);
i++ ;
return true;
} @Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
} @Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
} @Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
} /**
* 程序结束的时候,关闭
*/
@Override
public void close() throws IOException {
} } }
自定义实现InputFormat
二:自定义实现OutputFormat
常见的输出类型:TextInputFormat:默认输出格式,key和value中间用tab隔开.
DBOutputFormat:写出到数据库的.
SequenceFileFormat:将key,value以Sequence格式输出的.
SequenceFileAsOutputFormat:SequenceFile以原始二进制的格式输出.
MapFileOutputFormat:将key和value写入MapFile中.由于MapFile中key是有序的,所以写入的时候必须保证记录是按key值顺序入的.
MultipleOutputFormat:多文件的一个输出.默认情况下一个reducer产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs就可以实现这个功能.
MultipleOutputFormat:可以自定义输出文件的名称.
继承MultipleOutputFormat 需要实现
getBaseRecordWriter():
generateFileNameForKeyvalue():根据键值确定文件名.
/**
*自定义输出OutputFormat:用于处理各种输出目的地的.
*1.OutputFormat需要写出的键值对是来自于Reducer类.是通过RecordWriter获得的.
*2.RecordWriter(..)中write只有key和value,写到那里去哪?这要通过单独传入输出流来处理.write方法就是把k,v写入到outputStream中的.
*3.RecordWriter类是位于OutputFormat中的.因此,我们自定义OutputFormat必须继承OutputFormat类.那么流对象就必须在getRecordWriter(..)中获得.
*/
public class MySelfOutputFormatApp {
private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd/hello";// 输入路径
private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
private static final String OUT_FIE_NAME = "/abc";
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);
Job job = new Job(conf, WordCountApp.class.getSimpleName());
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(MySelfTextOutputFormat.class);
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
} public static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
}
}
} public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable times : values) {
count += times.get();
}
context.write(key, new LongWritable(count));
}
}
/**
*自定义输出类型
*/
public static class MySelfTextOutputFormat extends OutputFormat<Text,LongWritable>{
FSDataOutputStream outputStream = null ;
@Override
public RecordWriter<Text, LongWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
try {
FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUT_PATH), context.getConfiguration());
//指定的是输出文件的路径
String opath = MySelfOutputFormatApp.OUT_PATH+OUT_FIE_NAME;
outputStream = fileSystem.create(new Path(opath));
} catch (URISyntaxException e) {
e.printStackTrace();
}
return new MySelfRecordWriter(outputStream);
} @Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
} /**
* OutputCommitter:在作业初始化的时候创建一些临时的输出目录,作业的输出目录,管理作业和任务的临时文件的.
* 作业运行过程中,会产生很多的Task,Task在处理的时候也会产生很多的输出.也会创建这个输出目录.
* 当我们的Task或者是作业都运行完成之后,输出目录由OutputCommitter给删了.所以程序在运行结束之后,我们根本看不见任何额外的输出.
* 在程序运行中会产生很多的临时文件,临时文件全交给OutputCommitter处理,真正的输出是RecordWriter(..),我们只需要关注最后的输出就可以了.中间的临时文件就是程序运行时产生的.
*/
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
//提交任务的输出,包括初始化路径,包括在作业完成的时候清理作业,删除临时目录,包括作业和任务的临时目录.
//作业的输出路径应该是一个路径
return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUT_PATH), context);
}
}
public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable>{
FSDataOutputStream outputStream = null ;
public MySelfRecordWriter(FSDataOutputStream outputStream) {
this.outputStream = outputStream ;
}
@Override
public void write(Text key, LongWritable value) throws IOException,
InterruptedException {
this.outputStream.writeBytes(key.toString());
this.outputStream.writeBytes("\t");
this.outputStream.writeLong(value.get());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
this.outputStream.close();
}
}
}
自定义输出OutputFormat
三:输出到多个文件目录中去
/**
*输出到多个文件目录中去
*使用旧api
*/
public class MyMultipleOutputFormatApp {
private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd";// 输入路径
private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);
JobConf job = new JobConf(conf, WordCountApp.class);
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormat(MyMutipleFilesTextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
JobClient.runJob(job);
} catch (Exception e) {
e.printStackTrace();
}
} public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { @Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] splited = line.split("\t");
for (String word : splited) {
output.collect(new Text(word), new LongWritable(1));
}
}
} public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
long count = 0L ;
while(values.hasNext()){
LongWritable times = values.next();
count += times.get();
}
output.collect(key, new LongWritable(count));
}
}
public static class MyMutipleFilesTextOutputFormat extends MultipleOutputFormat<Text,LongWritable>{ @Override
protected org.apache.hadoop.mapred.RecordWriter<Text, LongWritable> getBaseRecordWriter(
FileSystem fs, JobConf job, String name, Progressable progress)
throws IOException {
TextOutputFormat<Text, LongWritable> textOutputFormat = new TextOutputFormat<Text,LongWritable>();
return textOutputFormat.getRecordWriter(fs, job, name, progress);
} @Override
protected String generateFileNameForKeyValue(Text key,
LongWritable value, String name) {
String keyString = key.toString();
if(keyString.startsWith("hello")){
return "hello";
}else{
//输出的文件名就是k3的值
return keyString ;
}
} }
}
输出到多个文件目录中去
四:hadoop1.x api写单词计数的例子
/**
*hadoop1.x
*使用旧api写单词计数的例子
*/
public class WordCountApp {
private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd/hello";
private static final String OUT_PATH = "hdfs://hadoop1:9000/out";
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(OUT_PATH),true);
JobConf job = new JobConf(conf, WordCountApp.class);
job.setJarByClass(WordCountApp.class); FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
JobClient.runJob(job);
} catch (IOException e) {
e.printStackTrace();
}
}
public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{ @Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] splited = line.split("\t");
for (String word : splited) {
output.collect(new Text(word), new LongWritable(1L));
}
}
} public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{ @Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
long times = 0L ;
while (values.hasNext()) {
LongWritable longWritable = (LongWritable) values.next();
times += longWritable.get();
}
output.collect(key, new LongWritable(times));
} } }
使用旧api写单词计数的例子
五:运行时接收命令行参数
/**
*运行时会接收一些命令行的参数
*Tool接口:支持命令行的参数
*命令行执行:
* hadoop jar jar.jar cmd.WordCountApp hdfs://hadoop1:9000/abd/hello hdfs://hadoop1:9000/out
*/
public class WordCountApp extends Configured implements Tool {
private static String INPUT_PATH = null;// 输入路径
private static String OUT_PATH = null;// 输出路径,reduce作业输出的结果是一个目录
@Override
public int run(String[] args) throws Exception {
INPUT_PATH = args[0];
OUT_PATH = args[1];
Configuration conf = getConf();// 配置对象
try {
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);
Job job = new Job(conf, WordCountApp.class.getSimpleName());// jobName:作业名称
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);// 指定数据的输入
job.setMapperClass(MyMapper.class);// 指定自定义map类
job.setMapOutputKeyClass(Text.class);// 指定map输出key的类型
job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
job.setOutputKeyClass(Text.class);// 设置Reduce输出key的类型
job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
job.waitForCompletion(true);// 提交给jobTracker并等待结束
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static void main(String[] args) {
try {
ToolRunner.run(new Configuration(), new WordCountApp(),args);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
}
}
} public static class MyReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable times : values) {
count += times.get();
}
context.write(key, new LongWritable(count));
}
}
}
运行时接收命令行参数
自定义实现InputFormat、OutputFormat、输出到多个文件目录中去、hadoop1.x api写单词计数的例子、运行时接收命令行参数,代码例子的更多相关文章
-
模块——Getopt::Long接收客户命令行参数和Smart::Comments输出获得的命令行参数内容
我们在linux常常用到一个程序需要加入参数,现在了解一下 perl 中的有关控制参数的模块 Getopt::Long ,比直接使用 @ARGV 的数组强大多了.我想大家知道在 Linux 中有的参 ...
-
TensorFlow进阶(六)---模型保存与恢复、自定义命令行参数
模型保存与恢复.自定义命令行参数. 在我们训练或者测试过程中,总会遇到需要保存训练完成的模型,然后从中恢复继续我们的测试或者其它使用.模型的保存和恢复也是通过tf.train.Saver类去实现,它主 ...
-
命令行参数 &;&; json 协议 &;&; 自定义 error 类型
命令行参数 在写代码的时候,在运行程序做一些初始化操作的时候,往往会通过命令行传参数到程序中,那么就会用到命令行参数 例如,指定程序运行的模式和级别: go run HTTPServer.go --m ...
-
python+pytest,通过自定义命令行参数,实现浏览器兼容性跑用例
场景拓展: UI自动化可能需要指定浏览器进行测试,为了做成自定义配置浏览器,可以通过动态添加pytest的命令行参数,在执行的时候,获取命令行传入的参数,在对应的浏览器执行用例. 1.自动化用例需要支 ...
-
如何将php的错误输出到nginx的error_log中去
参考文档:http://www.cnblogs.com/glory-jzx/p/3966082.html 通过FastCGI运行的PHP,在用户访问时出现错误,会首先写入到PHP的errorlog中如 ...
-
[Hadoop] - 自定义Mapreduce InputFormat&;OutputFormat
在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat ...
-
自定义注解之运行时注解(RetentionPolicy.RUNTIME)
对注解概念不了解的可以先看这个:Java注解基础概念总结 前面有提到注解按生命周期来划分可分为3类: 1.RetentionPolicy.SOURCE:注解只保留在源文件,当Java文件编译成clas ...
-
Java 9 揭秘(7. 创建自定义运行时映像)
Tips 做一个终身学习的人. 在第一章节中,主要介绍以下内容: 什么是自定义运行时映像和JIMAGE格式 如何使用jlink工具创建自定义的运行时映像 如何指定命令名称来运行存储在自定义映像中的应用 ...
-
nodejs 命令行、自定义
一.必备插件 1. babel:es6语法支持,需要babel-perset-es2015(转换成es5执行).babel.babel-core(程序执行) 2. commander:自定义命令插件, ...
随机推荐
-
mysql linux终端登陆
mysql -uroot -hlocalhost -psorry 设置远程登录 用户名及密码 GRANT ALL PRIVILEGES ON *.* TO root@"%" IDE ...
-
Sharepoint学习笔记—习题系列--70-573习题解析 -(Q40-Q44)
Question 40You need to send a single value from a consumer Web Part to a provider Web Part.Which int ...
-
[VS2012]无法新建或者编译已有的项目
今天启动VS2012时,发现提示插件错误,然后打开以前的网站时,发现报错如下: ContractNameMicrosoft.VisualStudio.Utilities.IContentTypereg ...
-
OpenXml操作Word的一些操作总结. - 天天不在
OpenXml相对于用MS提供的COM组件来生成WORD,有如下优势: 1.相对于MS 的COM组件,因为版本带来的不兼容问题,及各种会生成WORD半途会崩溃的问题. 2.对比填满一张30多页的WOR ...
-
linux 驱动入门5
慢慢的开始转驱动,目前比较有时间,一定要把驱动学会.哎.人生慢慢路,一回头.已经工作了八九年了.努力.在买套房.改退休了.学驱动.个人认为首先要熟悉驱动框架.慢慢来.心急吃不了热豆腐. 看网上都说的设 ...
-
详解Session分布式共享(.NET CORE版)
一.前言&回顾 在上篇文章Session分布式共享 = Session + Redis + Nginx中,好多同学留言问了我好多问题,其中印象深刻的有:nginx挂了怎么办?采用Redis的S ...
-
swoole_event_add实现异步
swoole提供了swoole_event_add函数,可以实现异步.此函数可以用在Server或Client模式下. 实现异步tcp客户端 示例: <?php $start_time = mi ...
-
如何阅读jdk源码?
简介 这篇文章主要讲述jdk本身的源码该如何阅读,关于各种框架的源码阅读我们后面再一起探讨. 笔者认为阅读源码主要包括下面几个步骤. 设定目标 凡事皆有目的,阅读源码也是一样. 从大的方面来说,我们阅 ...
-
微信小程序支付(PHP后端)
1.申请开通小程序支付,我们正式开通的微信支付是在微信公众平台上,我们需要绑定之前的微信商户平台即可,这一点不过多强调 2.小程序支付开发步骤 (1).统一下单 大家看到微信的统一下单接口密密麻麻的一 ...
-
z-index使用及一定要加backgroun
代码: <div> <span style="display:block;width:40px;height:20px;border:1px solid red;posit ...