hadoop的压缩解压缩,reduce端join,map端join

时间:2022-12-21 05:41:00

hadoop的压缩解压缩

    hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce输出的压缩这个事可以决定的.
    map输出压缩以后,在往reduce这块传输的过程中,传输数据量就少了.如果map输出量很大,可以采用map端的输出压缩,对我们的整个输出作业是有好处的,
选择压缩算法关注点:
      1.是否支持分隔.如果不支持分隔,那么整个 一个输入文件就会作为数据的一个输入源处理.不能分隔的意思就是一个T的数据交给一个map处理.
      2.压缩解压缩速度如何.通常情况下,hadoop操作的都是磁盘/IO密集型操作.运算的瓶颈一般都是在磁盘IO上,CPU这块利用率是不高的,由算法决定的,mapreducer这种算法一般都没有什么递归这种操作.CPU占用其实不高,并且mapreduce适合处理海量数据,所以数据量一般都是超大,就需要读磁盘,所以我们的mapareduce一般都是磁盘IO密集型的这种操作.
      MapReduce的输出进行压缩:
      //map端输出进行压缩
      conf.setBoolean("mapred.compress.map,output",true);
      //reduce端进行压缩
      conf.setBoolean("mapred.output.compress",true);
      //reduce端输出压缩使用的类:
      conf.setClass("mapred.output.compression.codec".GzipCodec.class,CompressionCodec.class);

reduce端join:

    数据处理过程中,处理的文件不是来自于一批文件,可能来自于多批文件,这两批文件之间是有联系的,这时候就涉及join
    在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每一条数据打个标签(tag),比如tag=0表示来自于file1,tag=2的来自于file2.即map阶段的主要任务就是对不同文件中的数据打标签.
    在reduce阶段,reduce函数就会获取key相同的来与file1和file2文件的value list,然后对于同一个key,对于file1和file2中的数据进行join(笛卡尔积).即:reduce 进行实际的连接操作.
      当map读取原始文件时,能不能区分出是file1还是file2?
      能. FileSplit fileSplit = (FileSplit)context.getInputSplit();
         String path= fileSplit.getPath().toString();
      map输出的时候,如何打标记?
      当我们判断出是file1时,对v2做标记,让v2的值是#zhangsan 如果是file2时,让v2的值是*45

map端join:

    之所以存在reduce端的join,是因为map阶段不能获取所有需要的join字段,即同一个key对应的字段可能位于不同map中,reduce端join是非常低效的,因为shuffle阶段要进行大量的数据传输.
    reduce端虽然是低效的,但并不能完全代替reduce端,因为map端适应于特定的场景,
    map端的join是针对以下场景进行的优化:

      在两个连接的表中,有一个表非常大,而另一个表非常小,以至于小表可以直接放在内存中.这样我们就可以.
    将小表的记录放在map端的内存中,只需要在map端读取进来的文件是大表就可以了,在map端对这个记录进行join操作.两张表的信息都读进去,数据就又混了,我们的用户信息表文件不大,可以单独划分一个文件的路径,不使用map读,可以使用在setup(..)中,使用FileSystem把记录读进来,解析出来的数据放到全局变量Map<Integer,String>,然后在map中读取进来的都是销售金额,就可以根据金额表的id去map中去找数据,做join操作.
    适用场景:

      小表可以完全读取到内存中,两个在内存中装不小的大表,不适合map端join.
    在一个TaskTracker中可以运行多个map任务,每个map任务都是一个java进程,
    map阶段通过setup()读取HDFS中的数据这种做法没有任何问题,但是如果有很多的map,意味着每一次都要建立setup()读取,每一次都要从HDFS中读取,建立很多文件,这样的话就有点浪费了.
    如果每个map从HDFS中读取相同的小表内容,就有点浪费了,使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从linux磁盘加载数据就行了,不必每次从HDFS加载.
    (1),作业提交之前,用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI.JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上.
    (2),在Mapper类的setup(),用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,读取文件内容,缓存到内存中就可以加载数据了.
写入磁盘的过程是在运行之前执行的.