Pyspark Join Optimization Techniques for Dataframes (Large and Small)

Samyukta Hariharan
5 min readApr 12, 2023

--

As Data Engineers, we have all learnt that efficiency is key. Although the measure for efficiency is subjective, everyone strives to achieve it wherever possible — whether it may be in code or configuration.

This article would be helpful if you are under constraint with respect to the resources you can delegate to your Spark Jobs, and need to speed up/optimize the code instead. It focuses mainly on joins, though there are also other areas where you can optimize your code.

Let’s dive in!

Large Dataframe (> 8GB) Join Optimization Techniques

1. Repartitioning on Join Key

For large dataframes, the aim would be to reduce shuffling the rows as much as possible. Repartition is a very powerful command when used at the right time.

https://blog.scottlogic.com/2018/03/22/apache-spark-performance.html

If we repartition both dataframes on the keys that are involved in their join, it would create homogeneous collection of keys in each partition, hosted on each executor. This way, The shuffling of rows required for this task would be much less compared to directly joining them.

This is because join requires both dataframes’ keys to be sorted, while repartition partitions across executors without sorting. The distributed partitions would be sorted only during the join and hence would turn out to be much faster.

Example code for repartitioning:

df_1 = df_1.repartition("customer_id")
df_2 = df_2.repartition("viewer_id")
joined_table = df_1.join(
df_2, df_1.customer_id == df_2.viewer_id, "inner"
)

As a reminder, beware that repartitioning involves shuffling too, so while this would result in lesser shuffling than a join key would, it still may not be fool-proof and needs to be used with caution. If you find your “spill memory”/”spill disk” or “shuffle size” to be very large, reconsider your join technique.

2. Reducing Cartesian Products

A Cartesian product occurs when you are performing a Cross Join. In simple words, a cartesian product occurs when all rows of one table have combinations with all rows from the other table.

So if both tables have 3 rows each, the resulting join would have 9 rows.

https://en.wikipedia.org/wiki/Cartesian_product

It is interesting to note that Theta Joins result in Cartesian Product too! This can be validated using DAGs in Spark UI, which would look something as such —

https://www.bigdatainrealworld.com/wp-content/uploads/2020/12/Cartesian-product-join-Spark-stages.png

The reason why this happens is because spark loops through every row to match the unbounded condition, as it cannot perform a direct match based on the value of the key. This significantly slows down the job.

To speed this up, ask yourself. Do I need the rows that do not match in this join? If your answer is no, then consider using a filter or where condition instead. So instead of this,

joined_table = df1.join(
df3,
df1.id == df3.id,
df1.cost_price >= df3.sell_price, "inner")

You can do this

joined_table = df1.join(
df3,
["id"], "inner")\
.filter(
df3, df1.cost_price >= df3.sell_price)

This optimization technique can be performed if you have two join conditions and one of them is a theta join.

3. Reordering Chained Joins

This actually happened — I had to join three tables, A, B and C. B was a massive table. When I chained the joins as

A → Joined with B → Joined with C

my code took 5+ hours!

After investigating a bit, I learnt that chaining your joins in a way that the heaviest table is joined in the end can significantly improve Spark’s internal query optimization and shuffling required for the join. I changed the order to

A → Joined with C → Joined with B

And my code finished in 11 min!

The order of joins can significantly impact the shuffling that would be required to perform these joins. Joining the smaller dataframes A and C first, creates a one-time shuffle of the large dataframe B.

But instead, if you join the large dataframe B first with A, then the result of the join would be a large dataframe as well, which would again have to be joined and shuffled with C. That is 2 joins of the large dataframe B!

Small Dataframe (≤ 8GB) Join Optimization Techniques

Broadcasting is hands down the best optimization technique for joins involving small dataframes. It writes the read-only versions of the dataframe to every executor, and each executor stores their version in memory.

joined_df = larger_df\
.join(broadcast(smaller_df), ["identifier"], "left")

Using broadcast particularly during joins is very useful. By broadcasting the smaller dataframe, spark will have to split and distribute only the larger dataframe into smaller pieces across the executors, during the join. This way, the join between the broadcasted small dataframe and the split piece of the large dataframe would involve lesser shuffling.

https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/

Beware of using broadcast on large dataframes since it would cause Memory errors. Some of the common errors associated with broadcasting large dataframes is OOM (Out of Memory error) or Executors getting killed unexpectedly.

Another thing to note is that broadcasting a left table on a left join, or a right table on a right join may not always be very useful. In a left join, all the rows from the left table are included in the resultant table, and only the matching rows are included from the right table. This way, the amount of data that would require shuffling for the right table may be much lesser. The inverse applies for a right join.

To conclude, as a checklist, some techniques that can help optimize your Pyspark joins are:

  1. Repartitioning each table on their join key(s) prior to performing the join
  2. Avoiding Cartesian Products wherever possible
  3. Reordering chained joins to make sure the largest table is joined in the end
  4. Broadcasting small tables whenever possible

PS: The reason why 8GB was given as the the dividing factor between Large and Small dataframes is because Spark’s limit for broadcasting is 8GB :)

Thanks for reading!

--

--

Samyukta Hariharan
Samyukta Hariharan

Written by Samyukta Hariharan

Research Engineer in AI/Data. Learning and writing about all things Data.

Responses (2)