HIVE: Map Join Vs Common Join, and SMB

时间:2023-03-08 16:15:48
HIVE: Map Join Vs Common Join, and SMB

HIVE 

Map Join is nothing but the extended version of Hash Join of SQL Server - just extending Hash Join into Distributed System.

SMB(Sort Merge Bucket) Join is also similar to the SQL Server Merge Join mechnism - just extending it into Distributed System.

If the tables being joined are bucketized, and the buckets are a multiple of each other, the buckets can be joined with each other. If table A has 8 buckets are table B has 4 buckets, the following join:

SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key

can be done on the mapper only. Instead of fetching B completely for each mapper of A, only the required buckets are fetched. For the query above, the mapper processing bucket 1 for A will only fetch bucket 1 of B. It is not the default behavior, and is governed by the following parameter

set hive.optimize.bucketmapjoin = true

If the tables being joined are sorted and bucketized, and the number of buckets are same, a sort-merge join can be performed. The corresponding buckets are joined with each other at the mapper. If both A and B have 4 buckets

SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM A a join B b on a.key = b.key

can be done on the mapper only. The mapper for the bucket for A will traverse the corresponding bucket for B. This is not the default behavior, and the following parameters need to be set:

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

下面进行一次简单的性能比较测试.

表结构:

hive> desc student;
OK
no double
name string
code string
Time taken: 0.568 seconds, Fetched: row(s)
hive> desc stu_add;
OK
add_code double
address string
Time taken: 0.093 seconds, Fetched: row(s)

表student大小,约470M

-rwxr-xr-x    stevenxia supergroup   -- : /user/hive/warehouse/student/part-m-00000_copy_7

表stu_add小大约 1K

Found  items
-rwxr-xr-x stevenxia supergroup -- : /user/hive/warehouse/stu_add/part-m-

运行

select s.name, a.address from student s join stu_add a on s.no = a.add_code;

进行了多次测试,结果:

序号 set hive.auto.convert.join = false; set hive.auto.convert.join = true;
1 2m 1s 35s
2 2m 9s 33s
3 2m 1s 33s

WHY?

我想主要Common Join有两点性能消耗比较多:

a. Shuffle过程,需要把各个mapper的结果写到磁盘

b. 需要把map task的结果复制到其它data node上进行reduce

这是我的理解,如有错误,不吝赐教。

reduce side join是一种最简单的join方式,其主要思想如下:

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

reduce side join + BloomFilter
将小表中的key保存到BloomFilter中,在map阶段扫描过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已.

Semi Join,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO.

实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。

Sort Merge Bucket Join 存在的目的主要是为了解决大表与大表间的 Join 问题,分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决之,这是典型的分而治之的思想。

连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量.

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;