Spark SQL Join

Reason is the light and the light of life.

Jerry Su Mar 21, 2019 2 mins

Join in Hive

Common Join

Common Join是Hive中的默认join类型,也称为Shuffle Join, Distributed Join, Sort Merged Join
在join期间,两个表中的所有行都将根据join key分发到所有节点,来自相同join key的值最终在同一节点上。
1. In the map stage, mappers reads the tables and output the join-column value as the key. The key-value pairs are written into an intermediate file.
2. In the shuffle stage, these pairs are sorts and merged. All rows from the same key will be sent to the same reducer instance.
3. In the reduce stage, reducer gets the sorted data and performs the join.

缺点:1. shuffle操作代价高,消耗网络资源。
2. 存在典型数据倾斜问题。如果join key数据分布不均匀,则相关的reducers会数据过载,导致多数reducers已经完成join操作,而小部分reducers仍在执行join操作。整体的运行时间取决于小部分reducers。

Common Join

Map Join

Broadcast join is called Map Join in Hive.
Common join数据shuffle代价比较高。为了加速Hive查询,可以使用Map Join。
Map Join使用准则:如果join操作中,存在可以装入内存的小表即可。
在join期间,两个表中的所有行都将根据join key分发到所有节点,来自相同join key的值最终在同一节点上。

1. Map Join的第一步是在原始Map Reduce任务之前创建Map Reduce本地任务,此map/reduce任务从HDFS读取小表的数据并将其保存到内存中的哈希表中,然后保存到哈希表文件中。
2. 当原始join Map Reduce任务启动时,它会将哈希表文件移动到Hadoop分布式缓存(这将把哈希表文件填充到每个mapper的本地磁盘,即广播broadcast)。
对于具有大表A和小表B的连接,对于表A的每个映射器,完全读取表B。当较小的表被加载到内存中然后在MapReduce作业的map阶段中执行join时,不需要reducer并且跳过reduce阶段。Map Join比常规默认join执行得更快。

There are two ways to enable MapJoin in Hive.
/*+ MAPJOIN(aliasname), MAPJOIN(anothertable) */类似于C语言注释,紧跟着放在SELECT之后,指示Hive将aliasname表加载如内存。
使用提示使用Map Join指定查询。下面的示例显示较小的表b是放在提示中的表,并强制手动缓存表B。

Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key

You can force BroadcastHashJoin using SQL’s BROADCAST hint. Supported hints include BROADCAST, BROADCASTJOIN or MAPJOIN.

va q = """
select /* + broadcast (I) */
    dw_cmc_instnc_chdu I
    dw_cmc_cntct C
    C.dt >= '20150101'
val qBroadcastRight = """
SELECT /*+ MAPJOIN (rt) */ 
    range(100) lf
inner join
    range(1000) rt

Map Join

Skewed Join

Skewed Join

Bucket Join

Bucket Join

Join in Spark SQL

- shuffle map join
- broadcast map join
- sort merge join

Map Join

Map Join in Hive

Map Join时间复杂度O(m + n), 笛卡尔集运算O(m * n)
Broadcast Hash Join & Shuffle Hash Join

Broadcast Map Join


import org.apache.spark.sql.functions.broadcast

def broadcast[T](df: Dataset[T]): Dataset[T]

Marks a DataFrame as small enough for use in broadcast joins.

The following example marks the right DataFrame for broadcast hash join using joinKey.

// left and right are DataFrames
left_large_dataframe.join(broadcast(right_small_dataframe), "joinKey")

Broadcast Hash Join

Read more:

Related posts: