Chapter 4. Joins (SQL and Core)

Joining data is an important part of many of our pipelines, and both Spark Core and SQL support the same fundamental types of joins. While joins are very common and powerful, they warrant special performance consideration as they may require large network transfers or even create datasets beyond our capability to handle.1 In core Spark it can be more important to think about the ordering of operations, since the DAG optimizer, unlike the SQL optimizer, isn’t able to re-order or push down filters.

Core Spark Joins

In this section we will go over the RDD type joins. Joins in general are expensive since they require that corresponding keys from each RDD are located at the same partition so that they can be combined locally. If the RDDs do not have known partitioners, they will need to be shuffled so that both RDDs share a partitioner, and data with the same keys lives in the same partitions, as shown in Figure 4-1. If they have the same partitioner, the data may be colocated, as in Figure 4-3, so as to avoid network transfer. Regardless of whether the partitioners are the same, if one (or both) of the RDDs have a known partitioner only a narrow dependency is created, as in Figure 4-2. As with most key/value operations, the cost of the join increases with the number of keys and the distance the records have to travel in order to get to their correct partition.

Join, full shuffle
Figure 4-1. Shuffle join
Join one partitioner known
Figure 4-2. Both known partitioner join
Colocated join
Figure 4-3. Colocated join
Tip

Two RDDs will be colocated if they have the same partitioner and were shuffled as part of the same action.

Tip

Core Spark joins are implemented using the cogroup function. We discuss cogroup in “Co-Grouping”.

Choosing a Join Type

The default join operation in Spark includes only values for keys present in both RDDs, and in the case of multiple values per key, provides all permutations of the key/value pair. The best scenario for a standard join is when both RDDs contain the same set of distinct keys. With duplicate keys, the size of the data may expand dramatically causing performance issues, and if one key is not present in both RDDs you will lose that row of data. Here are a few guidelines:

  • When both RDDs have duplicate keys, the join can cause the size of the data to expand dramatically. It may be better to perform a distinct or combineByKey operation to reduce the key space or to use cogroup to handle duplicate keys instead of producing the full cross product. By using smart partitioning during the combine step, it is possible to prevent a second shuffle in the join (we will discuss this in detail later).

  • If keys are not present in both RDDs you risk losing your data unexpectedly. It can be safer to use an outer join, so that you are guaranteed to keep all the data in either the left or the right RDD, then filter the data after the join.

  • If one RDD has some easy-to-define subset of the keys, in the other you may be better off filtering or reducing before the join to avoid a big shuffle of data, which you will ultimately throw away anyway.

Tip

Join is one of the most expensive operations you will commonly use in Spark, so it is worth doing what you can to shrink your data before performing a join.

For example, suppose you have one RDD with some data in the form (Panda id, score) and another RDD with (Panda id, address), and you want to send each panda some mail with her best score. You could join the RDDs on id and then compute the best score for each address, as shown in Example 4-1.

Example 4-1. Basic RDD join
  def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
   addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val joinedRDD = scoreRDD.join(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
  }

However, this is probably not as fast as first reducing the score data, so that the first dataset contains only one row for each panda with her best score, and then joining that data with the address data (as shown in Example 4-2).

Example 4-2. Pre-filter before join
  def joinScoresWithAddress2(scoreRDD : RDD[(Long, Double)],
    addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
   val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
   bestScoreData.join(addressRDD)
  }

If each Panda had 1,000 different scores then the size of the shuffle we did in the first approach was 1,000 times the size of the shuffle we did with this approach!

If we wanted to we could also perform a left outer join to keep all keys for processing even those missing in the right RDD by using leftOuterJoin in place of join, as in Example 4-3. Spark also has fullOuterJoin and rightOuterJoin depending on which records we wish to keep. Any missing values are None and present values are Some('x').

Example 4-3. Basic RDD left outer join
  def outerJoinScoresWithAddress(scoreRDD : RDD[(Long, Double)],
   addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, Option[String]))]= {
    val joinedRDD = scoreRDD.leftOuterJoin(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
  }

Choosing an Execution Plan

In order to join data, Spark needs the data that is to be joined (i.e., the data based on each key) to live on the same partition. The default implementation of a join in Spark is a shuffled hash join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. While this approach always works, it can be more expensive than necessary because it requires a shuffle. The shuffle can be avoided if:

  1. Both RDDs have a known partitioner.

  2. One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join (we will explain what this is later).

Note that if the RDDs are colocated the network transfer can be avoided, along with the shuffle.

Speeding up joins by assigning a known partitioner

If you have to do an operation before the join that requires a shuffle, such as aggregateByKey or reduceByKey, you can prevent the shuffle by adding a hash partitioner with the same number of partitions as an explicit argument to the first operation before the join. You could make the example in the previous section even faster, by using the partitioner for the address data as an argument for the reduceByKey step, as in Example 4-4 and Figure 4-4.

Example 4-4. Known partitioner join
  def joinScoresWithAddress3(scoreRDD: RDD[(Long, Double)],
   addressRDD: RDD[(Long, String)]) : RDD[(Long, (Double, String))]= {
    // If addressRDD has a known partitioner we should use that,
    // otherwise it has a default hash parttioner, which we can reconstruct by
    // getting the number of partitions.
    val addressDataPartitioner = addressRDD.partitioner match {
      case (Some(p)) => p
      case (None) => new HashPartitioner(addressRDD.partitions.length)
    }
    val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner,
      (x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
  }
Tip

If the RDDs sharing the same partitioner are materialized by the same action, they will end up being co-located (which can even reduce network traffic).

Tip

(Almost) always persist after repartitioning.

Join both partitioners known
Figure 4-4. Both known partitioner join

Speeding up joins using a broadcast hash join

A broadcast hash join pushes one of the RDDs (the smaller one) to each of the worker nodes. Then it does a map-side combine with each partition of the larger RDD. If one of your RDDs can fit in memory or can be made to fit in memory it is always beneficial to do a broadcast hash join, since it doesn’t require a shuffle. Sometimes (but not always) Spark SQL will be smart enough to configure the broadcast join itself; in Spark SQL this is controlled with spark.sql.autoBroadcastJoinThreshold and spark.sql.broadcastTimeout. This is illustrated in Figure 4-5.

Broadcast Hash Join
Figure 4-5. Broadcast hash join

Spark Core does not have an implementation of the broadcast hash join. Instead, we can manually implement a version of the broadcast hash join by collecting the smaller RDD to the driver as a map, then broadcasting the result, and using mapPartitions to combine the elements.

Example 4-5 is a general function that could be used to join a larger and smaller RDD. Its behavior mirrors the default “join” operation in Spark. We exclude elements whose keys do not appear in both RDDs.

Example 4-5. Manual broadcast hash join
 def manualBroadCastHashJoin[K : Ordering : ClassTag, V1 : ClassTag,
 V2 : ClassTag](bigRDD : RDD[(K, V1)],
  smallRDD : RDD[(K, V2)])= {
  val smallRDDLocal: Map[K, V2] = smallRDD.collectAsMap()
  val smallRDDLocalBcast = bigRDD.sparkContext.broadcast(smallRDDLocal)
  bigRDD.mapPartitions(iter => {
   iter.flatMap{
    case (k,v1 ) =>
     smallRDDLocalBcast.value.get(k) match {
      case None => Seq.empty[(K, (V1, V2))]
      case Some(v2) => Seq((k, (v1, v2)))
     }
   }
  }, preservesPartitioning = true)
 }
 //end:coreBroadCast[]
}

Partial manual broadcast hash join

Sometimes not all of our smaller RDD will fit into memory, but some keys are so overrepresented in the large dataset that you want to broadcast just the most common keys. This is especially useful if one key is so large that it can’t fit on a single partition. In this case you can use countByKeyApprox2 on the large RDD to get an approximate idea of which keys would most benefit from a broadcast. You then filter the smaller RDD for only these keys, collecting the result locally in a HashMap. Using sc.broadcast you can broadcast the HashMap so that each worker only has one copy and manually perform the join against the HashMap. Using the same HashMap you can then filter your large RDD down to not include the large number of duplicate keys and perform your standard join, unioning it with the result of your manual join. This approach is quite convoluted but may allow you to handle highly skewed data you couldn’t otherwise process.

Spark SQL Joins

Spark SQL supports the same basic join types as core Spark, but the optimizer is able to do more of the heavy lifting for you—although you also give up some of your control. For example, Spark SQL can sometimes push down or reorder operations to make your joins more efficient. On the other hand, you don’t control the partitioner for DataFrames or Datasets, so you can’t manually avoid shuffles as you did with core Spark joins.

DataFrame Joins

Joining data between DataFrames is one of the most common multi-DataFrame transformations. The standard SQL join types are all supported and can be specified as the joinType in df.join(otherDf, sqlCondition, joinType) when performing a join. As with joins between RDDs, joining with nonunique keys will result in the cross product (so if the left table has R1 and R2 with key1 and the right table has R3 and R5 with key1 you will get (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in the output. While we explore Spark SQL joins we will use two example tables of pandas, Tables 4-1 and 4-2.

Warning

While self joins are supported, you must alias the fields you are interested in to different names beforehand, so they can be accessed.

Table 4-1. Table of pandas and sizes (our left DataFrame)
Name Size

Happy

1.0

Sad

0.9

Happy

1.5

Coffee

3.0

Table 4-2. Table of pandas and zip codes (our right DataFrame)
Name Zip

Happy

94110

Happy

94103

Coffee

10504

Tea

07012

Spark’s supported join types are “inner,” “left_outer” (aliased as “outer”), “left_anti,” “right_outer,” “full_outer,” and “left_semi.”3 With the exception of “left_semi” these join types all join the two tables, but they behave differently when handling rows that do not have keys in both tables.

The “inner” join is both the default and likely what you think of when you think of joining tables. It requires that the key be present in both tables, or the result is dropped as shown in Example 4-6 and Table 4-3.

Example 4-6. Simple inner join
    // Inner join implicit
    df1.join(df2, df1("name") === df2("name"))
    // Inner join explicit
    df1.join(df2, df1("name") === df2("name"), "inner")
Table 4-3. Inner join of df1, df2 on name
Name Size Name Zip

Coffee

3.0

Coffee

10504

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Left outer joins will produce a table with all of the keys from the left table, and any rows without matching keys in the right table will have null values in the fields that would be populated by the right table. Right outer joins are the same, but with the requirements reversed. A sample left outer join is in Example 4-7, and the result is shown in Table 4-4.

Example 4-7. Left outer join
    // Left outer join explicit
    df1.join(df2, df1("name") === df2("name"), "left_outer")
Table 4-4. Left outer join df1, df2 on name
Name Size Name Zip

Sad

0.9

null

null

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

A sample right outer join is in Example 4-8, and the result is shown in Table 4-5.

Example 4-8. Right outer join
    // Right outer join explicit
    df1.join(df2, df1("name") === df2("name"), "right_outer")
Table 4-5. Right outer join df1, df2 on name
Name Size Name Zip

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

null

null

Tea

07012

To keep all records from both tables you can use the full outer join, which results in Table 4-6.

Table 4-6. Full outer join df1, df2 on name
Name Size Name Zip

Sad

0.9

null

null

Coffee

3.0

Coffee

10504

Happy

1.0

Happy

94110

Happy

1.0

Happy

94103

Happy

1.5

Happy

94110

Happy

1.5

Happy

94103

null

null

Tea

07012

Left semi joins (as in Example 4-9 and Table 4-7) and left anti joins (as in Table 4-8) are the only kinds of joins that only have values from the left table. A left semi join is the same as filtering the left table for only rows with keys present in the right table. The left anti join also only returns data from the left table, but instead only returns records that are not present in the right table.

Example 4-9. Left semi join
    // Left semi join explicit
    df1.join(df2, df1("name") === df2("name"), "left_semi")
Table 4-7. Left semi join
Name Size

Coffee

3.0

Happy

1.0

Happy

1.5

Table 4-8. Left anti join
Name Size

Sad

0.9

Self joins

Self joins are supported on DataFrames, but we end up with duplicated columns names. So that you can access the results, you need to alias the DataFrames to different names—otherwise you will be unable to select the columns due to name collision (see Example 4-10). Once you’ve aliased each DataFrame, in the result you can access the individual columns for each DataFrame with dfName.colName.

Example 4-10. Self join
    val joined = df.as("a").join(df.as("b")).where($"a.name" === $"b.name")

Broadcast hash joins

In Spark SQL you can see the type of join being performed by calling queryExecution.executedPlan. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling broadcast on the DataFrame before joining it (e.g., df1.join(broadcast(df2), "key")). Spark also automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast.

Dataset Joins

Joining Datasets is done with joinWith, and this behaves similarly to a regular relational join, except the result is a tuple of the different record types as shown in Example 4-11. This is somewhat more awkward to work with after the join, but also does make self joins, as shown in Example 4-12, much easier, as you don’t need to alias the columns first.

Example 4-11. Joining two Datasets
    val result: Dataset[(RawPanda, CoffeeShop)] = pandas.joinWith(coffeeShops,
      $"zip" === $"zip")
Example 4-12. Self join a Dataset
    val result: Dataset[(RawPanda, RawPanda)] = pandas.joinWith(pandas,
      $"zip" === $"zip")
Note

Using a self join and a lit(true), you can produce the cartesian product of your Dataset, which can be useful but also illustrates how joins (especially self joins) can easily result in unworkable data sizes.

As with DataFrames you can specify the type of join desired (e.g., inner, left_outer, right_outer, left_semi), changing how records present only in one Dataset are handled. Missing records are represented by null values, so be careful.

Conclusion

Now that you have explored joins, it’s time to focus on transformations and the performance considerations associated with them.

1 As the saying goes, the cross product of big data and big data is an out-of-memory exception.

2 If the number of distinct keys is too high, you can also use reduceByKey, sort on the value, and take the top k.

3 The quotes are optional and can be left out. We use them in our examples because we think it is easier to read with the quotes present.

Get High Performance Spark now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.