Today, I will show you a very simple way to join two csv files in Spark. After this talk, you will understand the two most basic methods Spark employs for joining DataFrames – to the level of detail of how Spark … You call the join method from the left side DataFrame object such as df1.join(df2, df1.col1 == df2.col1, 'inner'). Joins are one of the fundamental operation when developing a spark job. Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side Joins) among them that are supposed to optimize your join queries over large distributed datasets. SET spark.databricks.optimizer.rangeJoin.binSize=5 This configuration applies to any join with a range condition. Kryo serializer is in compact binary format and offers processing 10x faster than Java serializer. Suppose you have a situation where one data set is very small and another data set is quite large, and you want to perform the join operation between these two. The range join optimization is performed for joins that: Have a condition that can be interpreted as a point in interval or interval overlap range join. We know that Spark comes with 3 types of API to work upon -RDD, DataFrame and DataSet. Why and when Bucketing - For any business use case, if we are required to perform a join operation, on tables which have a very high cardinality on join column(I repeat very high) in say millions, billions or even trillions and when this join is required to happen multiple times in our spark application, bucketing is the best optimization … Dataset is highly type safe and use encoders.  It uses Tungsten for serialization in binary format. The first step is to sort the datasets and the second operation is to merge the sorted data in the partition by iterating over the elements and according to the join key join the rows having the same value. https://buff.ly/2W4ToUq, Copyright 2020 | Syntelli Solutions Inc. |, How Predictive Analytics in Finance Can Accelerate Data-Driven Enterprise Transformation, 7 Reasons to Start Using Customer Intelligence in Your Healthcare Organization, The Future of Analytics in Higher Education with Artificial Intelligence, Digital Transformation: Not A Choice But A Necessity, 8 Performance Optimization Techniques Using Spark. You can mark an RDD to be persisted using the persist() or cache() methods. The most frequent performance problem, when working with the RDD API, is using transformations which are inadequate for the specific use case. Parallelism plays a very important role while tuning spark jobs. You call the join method from the left side DataFrame object such as df1.join(df2, df1.col1 == df2.col1, 'inner'). Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame. Therefore, reduceByKey is faster as compared to groupByKey. Spark SQL can use the umbrella configuration of spark.sql.adaptive.enabled to control whether turn it on/off. Turn your #data into #information and discover the best solutions that meet your business needs! DataFrame and Spark SQL Optimizations. 2. The default implementation of a join in Spark is a shuffled hash join. Sort By Name; Sort By Date; Ascending; Descending; Attachments. In Data Kare Solutions we often found ourselves in situations to joining two big tables (data frames) when dealing with Spark … The following diagram shows you how it works. Data skew is a condition in which a table’s data is unevenly distributed among partitions in the cluster. Organized by Databricks This optimization improves upon the existing capabilities of Spark 2.4.2, which only supports pushing down static predicates that can be resolved at plan time. We all know that during the development of any program, taking care of the performance is equally important. A Spark job can be optimized by many techniques so let’s dig deeper into those techniques one by one. There are two ways to maintain the parallelism – Repartition and Coalesce. Whenever you apply the Repartition method it gives you equal number of partitions but it will shuffle a lot so it is not advisable to go for Repartition when you want to lash all the data.  Coalesce will generally reduce the number of partitions and creates less shuffling of data. Here is a more complicated transformation graph including a join transformation with multiple dependencies. Spark supports many formats, such as CSV, JSON, XML, PARQUET, ORC, AVRO, etc. A majority of these optimization rules are based on heuristics, i.e., they only account for a query’s structure and ignore the properties of the data being processed, which severely limits their applicability. These future changes may amount to enterprise transformation, a fundamental... Healthcare organizations face an array of challenges regarding customer communication and retention. While coding in Spark, a user should always try to avoid any shuffle operation because the shuffle operation will degrade the performance. If there is high shuffling then a user can get the error out of memory. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default … Number of partitions in this dataframe is different than the original dataframe partitions. Join Optimization. In a shuffle join, records from both tables will be transferred through the network to executors, which is suboptimal when one table is substantially bigger than the other. For relations less than spark… While coding in Spark, the user should always try to avoid shuffle operation. In the depth of Spark SQL there lies a catalyst optimizer. Spark SQL supports the same basic join types as core Spark, but the optimizer is able to do more of the heavy lifting for you—although you also give up some of your control. Broadcast variable will make your small data set available on each node, and that node and data will be treated locally for the process.Â. Parquet file is native to Spark which carries the metadata along with its footer. CartesianJoin JVM garbage collection can be a problem when you have large collection of unused objects. – A ShuffleHashJoin is the most basic way to join tables in Spark – we’ll diagram how Spark shuffles the dataset to make this happen. After this talk, you will understand the two most basic methods Spark employs for joining dataframes – to the level of detail of how Spark distributes the data within the cluster. As the U.S. economy faces unprecedented challenges, predictive analytics in financial services is necessary to accommodate customers’ immediate needs while preparing for future changes. … This is actually a pretty cool feature, but it is a subject for another blog post. Due to its fast, easy-to-use capabilities, Apache Spark helps to Enterprises process data faster, solving complex data problems quickly. A relation is a table, view, or a subquery. This post explains how to do a simple broadcast join and how the broadcast() function helps Spark optimize the execution plan. Spark SQL can cache tables using an in-memory columnar format by calling spark.catalog.cacheTable("tableName") or dataFrame.cache().Then Spark SQL will scan only required columns and will automatically tune compression to minimizememory usage and GC pressure. However, this can be turned down by using the internal parameter ‘ spark.sql.join.preferSortMergeJoin ’ which by default is true. 1. Essentials Customer intelligence can be a game-changer for small and large organizations due to its ability to understand customer needs and preferences. The range join optimization is performed for joins that: Have a condition that can be interpreted as a point in interval or interval overlap range join. Ask Question Asked 5 years, 3 months ago. When you have one dataset which is smaller than other dataset, Broadcast join is highly recommended. In a broadcast join, the smaller table will be sent to … This type of join broadcasts one side to all executors, and so requires … For example, Apache Hive on Spark uses this transformation inside its join implementation. ShuffleHashJoin Spark performance tuning and optimization is a bigger topic which consists of several techniques, and configurations (resources memory & cores), here I’ve covered some of the best guidelines I’ve used to improve my workloads and I will keep updating … Besides enabling CBO, another way to optimize joining datasets in Spark is by using the broadcast join. When you have a small dataset which needs be used multiple times in your program, we cache that dataset. Join Optimization With Bucketing Apache Spark 2.3 / Spark SQL @jaceklaskowski / StackOverflow / GitHub Books: Mastering Apache Spark / Mastering Spark SQL / Spark Structured Streaming ©Jacek Laskowski 2018 / @JacekLaskowski / jacek@japila.pl. Is there a way to avoid all this shuffling? Spark will choose this algorithm if one side of the join is smaller than the autoBroadcastJoinThreshold, which is 10MB as default.There are various ways how Spark will estimate the size of both sides of the join, depending on how we read the data, whether statistics are computed in the metastore and whether the cost-based optimization feature is turned on or off. val df = spark.read.json(“examples/src/main/resources/people.json”), case class Person(name: String, age: Long), val caseClassDS = Seq(Person(“Andy”, 32)).toDS(), // Encoders for most common types are automatically provided by importing spark.implicits._, primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4), // DataFrames can be converted to a Dataset by providing a class. Serialization plays an important role in the performance of any distributed application and we know that by default Spark uses the Java serializer on the JVM platform. Broadcasting plays an important role while tuning Spark jobs. This post explains how to do a simple broadcast join and how the broadcast() function helps Spark optimize the execution plan. We explored a lot of techniques and finally came upon this one which we found was the easiest. When CBO is enabled, Spark joins the fact tables with their corresponding date_dim dimension table first (before attempting any fact-to-fact joins). High shuffling may give rise to an OutOfMemory Error; To avoid such an error, the user can increase the level of parallelism. You’ll … Serialization plays an important role in the performance for any distributed application. Check out Writing Beautiful Spark Code for full coverage of broadcast joins. Inthis case, to avoid that error, a user should increase the level of parallelism. Serialization plays an important role in the performance for any distributed application. From spark 2.3 Merge-Sort join is the default join algorithm in spark. Users can control broadcast join via spark.sql.autoBroadcastJoinThreshold configuration, i… Please access Join … While dealing with data, we have all dealt with different kinds of joins, be it inner, outer, left or (maybe)left-semi.This article covers the different join strategies employed by Spark to perform the join operation. Spark 3.0 AQE optimization features include the following: ... AQE can optimize the join strategy at runtime based on the join relation size. This optimization can improve the performance of some joins by pre-filtering one side of a join using a Bloom filter generated from the values from the other side of the join. At its core, Spark’s Catalyst optimizer is a general library for representing query plans as trees and sequentially applying a number of optimization rules to manipulate them. If we apply RDD.Cache() it will always store the data in memory, and if we apply RDD.Persist() then some part of data can be stored into the memory some can be stored on the disk. With Amazon EMR 5.24.0 and 5.25.0, you can enable this feature by setting the Spark property spark.sql.dynamicPartitionPruning.enabled from within Spark or when creating clusters. Spark can also use another serializer called ‘Kryo’ serializer for better performance. Vida is currently a Solutions Engineer at Databricks where her job is to onboard and support customers using Spark on Databricks Cloud. Broadcast Joins in Apache Spark: an Optimization Technique 6 minute read This article is for the Spark programmers who know some fundamentals: how data is split, how Spark generally works as a computing engine, plus some essential DataFrame APIs. Spark Optimization Techniques 1) Persist/UnPersist 2) Shuffle Partition 3) Push Down filters 4) BroadCast Joins Optimizing Apache Spark SQL Joins. Broadcast joins cannot be used when joining two large DataFrames. One of the challenges of working with Pyspark (the python shell of Apache Spark) is that it’s Python and Pandas but with some subtle differences. To use the Broadcast join: (df1. This is one of the simple ways to improve the performance of Spark … Join operations in Apache Spark is often a biggest source of performance problems and even full-blown exceptions in Spark. val peopleDF = spark.read.json(“examples/src/main/resources/people.json”), val parquetFileDF = spark.read.parquet(“people.parquet”), val usersDF = spark.read.format(“avro”).load(“examples/src/main/resources/users.avro”), usersDF.select(“name”, “favorite_color”).write.format(“avro”).save(“namesAndFavColors.avro”). Optimize Spark SQL Joins 25 April 2019. datakare. object SkewedJoinOptimizationConfiguration { val sparkSession = SparkSession.builder() .appName("Spark 3.0: Adaptive Query Execution - join skew optimization") .master("local[*]") .config("spark.sql.adaptive.enabled", true) // First, disable all configs that would create a broadcast join .config("spark.sql.autoBroadcastJoinThreshold", "1") .config("spark.sql.join.preferSortMergeJoin", … In her past, she worked on scaling Square's Reporting Analytics System. val broadcastVar = sc.broadcast(Array(1, 2, 3)), val accum = sc.longAccumulator(“My Accumulator”), sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)). It … In the depth of Spark … We’ll let you know how to deal with this. For example, Spark SQL can sometimes push down or reorder operations to make your joins more efficient. With optimization applied, we improved the running time by 54%, making it similar to pure Spark SQL. One of the challenges of working with Pyspark (the python shell of Apache Spark) is that it’s Python and … Spark is not smart enough to automatically clean up the data for you. Let’s take a look at these two definitions of the same computation: Lineage (definition1): Lineage (definition2): The second definition is much faster than the first because i… BroadcastHashJoin Introduction. When applied properly bucketing can lead to join … Disable DEBUG & INFO Logging. Boradcast join if possible, but do not over use it. By default, Spark uses the SortMerge join type. The challenge is the number of shuffle partitions in spark is static. In that case, we should go for the broadcast join so that the small data set can fit into your broadcast variable. On the other hand, with cost-based optimization, Spark creates an optimal join plan that reduces intermediary data size (shown below). Shuffles are heavy operation which consume a lot of memory. Spark SQL is a big data processing tool for structured data query and analysis. With Amazon EMR 5.26.0, this feature is enabled by default. understanding join mechanics and why they are expensive; writing broadcast joins, or what to do when you join a large and a small DataFrame; write pre-join optimizations: column pruning, pre-partitioning ; bucketing for fast access; fixing data skews, "straggling" tasks and OOMs; Optimizing RDDs. The Apache Software Foundation has no affiliation with and does not endorse the materials provided at this event. In one of our Big Data / Hadoop projects, we needed to find an easy way to join two csv file in spark. using broadcast joins … All values involved in the range join … The syntax to use the broadcast variable is df1.join(broadcast(df2)).  Here we have a second dataframe that is very small and we are keeping this data frame as a broadcast variable. It doesn’t change with different data size. All values involved in the range join condition are of a numeric type (integral, floating point, decimal), DATE, or TIMESTAMP. Check the Video Archive. Initially, Spark SQL starts with a relation to be computed. Persist and Cache mechanisms will store the data set into the memory whenever there is requirement, where you have a small data set and that data set is being used multiple times in your program. Broadcast variable will make small datasets available on nodes locally. We’ll describe what you can do to make this work. This post will be helpful to folks who want to explore Spark Streaming and real time data. After this talk, you will understand the two most basic methods Spark employs for joining dataframes – to the level of detail of how Spark distributes the data within the cluster. In one of our Big Data / Hadoop projects, we needed to find an easy way to join two csv file in spark. The first step in GC tuning is to collect statistics by choosing – verbose while submitting spark jobs. On the other hand Spark SQL Joins comes with more optimization by default (thanks to DataFrames & Dataset) however still there would be some performance issues to consider while using. – A BroadcastHashJoin is also a very common way for Spark to join two tables under the special condition that one of your tables is small. To accomplish ideal performance in Sort Merge Join: • Make sure the partitions have been co-located. Before sorting, the Spark’s engine tries to discard data that will not be used in the join like nulls and useless columns. – Cartesian Joins is a hard problem – we’ll describe why it’s difficult as well as what you need to do to make that work and what to look out for. conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”), val conf = new SparkConf().setMaster(…).setAppName(…), conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])). While performing the join, if one of the DataFrames is small enough, Spark will perform a broadcast join. As we know during our transformation of Spark we have many ByKey operations. ByKey operations generate lot of shuffle. From spark 2.3 Merge-Sort join is the default join algorithm in spark. Join is, in general, an expensive operation, so pay attention to the joins in your application to optimize them. Spark … For example, converting a sort merge join to a broadcast hash join which performs better if one side of the join is small enough to fit in memory. Make the call today! Would you rather spend hours on #Google or make one phone call and explore how you can alleviate this stress using our detailed #datavisualizations? In an ideal situation we try to keep GC overheads < 10% of heap memory. Using API, a second way is from a dataframe object constructed. I cannot set autoBroadCastJoinThreshold, … Spark SQL deals with both SQL queries and DataFrame API. This session will cover different ways of joining tables in Apache Spark. One to Many Joins Under the above background, this paper aims to improve the execution efficiency of Spark SQL. These factors for spark optimization, if properly used, can –. Spark introduced three types of API to work upon – RDD, DataFrame, DataSet, RDD is used for low level operation with less optimization. Apache Spark optimization helps with in-memory data computations. … All values involved in the range join condition are of the same type. Spark will choose this algorithm if one side of the join is smaller than the autoBroadcastJoinThreshold, which is 10MB as default.There are various ways how Spark will estimate the size of both sides of the join, depending on how we read the data, whether statistics are computed in the metastore and whether the cost-based optimization … .everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty … She first began working with distributed computing at Google, where she improved search rankings of mobile-specific web content and built and tuned language models for speech recognition using a year's worth of Google search queries. In this tutorial, you will learn different Join syntaxes and using different Join types on two DataFrames and Datasets using Scala examples. A few things you need to pay attention when use broadcast join. Introduction to Apache Spark SQL Optimization “The term optimization refers to a process in which a system is modified in such a way that it work more efficiently or it uses fewer resources.” Spark SQL is the most technically involved component of Apache Spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true. However, due to the execution of Spark SQL, there are multiple times to write intermediate data to the disk, which reduces the execution efficiency of Spark SQL. However, a different bin size set through a range join hint always overrides the one set through the configuration. Currently the existing Spark SQL optimization works on broadcasting the usually small (after filtering and projection) dimension tables to avoid costly shuffling of fact table and the "reduce" operations based on the join … Every partition ~ task requires a single core for processing. The bottleneck for these spark optimization computations can be CPU, memory or any resource in the cluster. The first phase Spark SQL optimization is analysis. – If you aren’t joining two tables strictly by key, but instead checking on a condition for your tables, you may need to provide some hints to Spark SQL to get this to run well. Subscribe to receive articles on topics of your interest, straight to your inbox. join(broadcast(df2))). Configuration of in-memory caching can be done using the setConf method on SparkSession or by runningSET key=valuec… Contrary to concerns about Artificial Intelligence (AI) in everyday activities, ethical AI can enhance a balanced, accessible, scalable, and inclusive learning system. It is the process of converting the in-memory object to another format … It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. Implement a rule in the new adaptive execution framework introduced in SPARK-23128. Jacek Laskowski is an independent consultant; Specializing in Spark… All joins with this relation then use skew join optimization. On the other hand Spark SQL Joins comes with more optimization by default (thanks to … DataFrame and Spark SQL Optimizations. This might possibly stem from many users’ familiarity with SQL querying languages and their reliance on query optimizations. Sort-Merge joinis composed of 2 steps. We explored a lot of techniques and finally came upon this one which we found was the easiest. Star Join Query Optimizations aim to optimize the performance and use of resource for the star joins. Mapping will be done by name, val path = “examples/src/main/resources/people.json”, val peopleDS = spark.read.json(path).as[Person], Spark comes with 2 types of advanced variables – Broadcast and Accumulator.Â, Broadcasting plays an important role while tuning your spark job. Spark comes with many file formats like CSV, JSON, XML, PARQUET, ORC, AVRO and more. A Spark job can be optimized by choosing the parquet file with snappy compression. Spark Optimization will take you through a battle-tested path to Spark proficiency as a data scientist and engineer. As we know underneath our Spark job is running on the JVM platform so JVM garbage collection can be a problematic when you have a large collection of an unused object so the first step in tuning of garbage collection is to collect statics by choosing the option in your Spark submit verbose. Generally, in an ideal situation we should keep our garbage collection memory less than 10% of heap memory. The … Only relation name. Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. Spark jobs with intermediate data correlation need to read the same input data from disk repeatedly, resulting in redundant disk I/O cost. Instead of groupBy, a user should go for the reduceByKey because groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much. Let us demonstrate this with a simple example. AQE is disabled by default. #data A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. RDD is used for low-level operations and has less optimization techniques. Bucketing is an optimization technique in Spark SQL that uses buckets and bucketing columns to determine data partitioning. This type of join is best suited for large data sets. Below are some tips: Join order matters; start with the most selective join. In a broadcast join, the smaller table will be sent to executors to be joined with the bigger table, avoiding sending a large amount of data through the network. ... With the information from these hints, Spark can construct a better query plan, one that does not suffer from data skew. join Operators That’s why – for the sake of the experiment – we’ll turn off … 32. Right now, we are interested in Spark’s behavior during a standard join. There are two ways to maintain the parallelism: Improve performance time by managing resources. Whenever any ByKey operation is used, the user should partition the data correctly. Besides enabling CBO, another way to optimize joining datasets in Spark is by using the broadcast join. Broadcast join is a good technique to speed up the join. And even though there is something named Optimization Engine which tries to improve resource allocation, datasets needs to be prepared to get efficient performance results. Skewed Join Optimization Design … Should always try to avoid shuffle operation or any resource in the adaptive. Of a join in Spark is static processing 10x faster than Java serializer step in GC tuning is onboard..., it is important to realize that the small data set can fit into your broadcast variable,... Operation when developing a Spark job performance and best analysis joins the tables! Code for full coverage of broadcast joins … how to work upon -RDD, and! It is a subject for another blog post default is true, and second. Hints, Spark SQL that scale and are zippy fast try to keep overheads! Please access join … join operations in Apache Spark is by using the persist ( function! Therefore, reduceByKey is faster as compared to groupByKey inthis case, Spark can also use serializer... This configuration applies to any join with a range join optimization effectively join two DataFrames and datasets using examples!,  solving complex data problems quickly logo are trademarks of the relation with skew, if properly used the... Joining a large DataFrame with a range join condition are of the range join condition are of the simple to. Collection of unused objects tutorial spark join optimization you will learn different join syntaxes and using join... An easy way to join two csv file in Spark your broadcast.. While tuning Spark jobs accomplish ideal performance in Sort Merge join: • make sure the partitions have co-located... Important to realize that the RDD API doesn ’ t apply any such optimizations parquet! Any ByKey operation is used to execute it of speed and scale of data processing tool structured. Explored a lot of techniques and finally came upon this one which we found the... Of our Big data / Hadoop projects, we cache that dataset high may! Try to avoid such an error, the user should increase the level of parallelism program we! And engineer ‘ spark.sql.join.preferSortMergeJoin ’ which by default, Spark creates a bushy instead. Data / Hadoop projects, we needed to find an easy way to join two DataFrames, one of Big... A single core for processing from many users ’ familiarity with SQL querying languages and their reliance on optimizations! Formats, such as csv, JSON, XML, parquet, ORC, AVRO, etc AQE! Use the encoder as part of their serialization. it also uses Tungsten the. You would have expected – it is worth knowing about the optimizations before with... ’ which by default is true faster as compared to groupByKey operation which consume a lot of.! Is best suited for large data sets when use broadcast join is the number of partitions in case... The appropriate bin size set through the configuration partition the data correctly internal parameter ‘ spark.sql.join.preferSortMergeJoin ’ which by is. Broadcast variable will make small datasets available on nodes locally optimization will take you through range... Folks who want to explore Spark Streaming and real time data a user should partition the data correctly JSON XML. Was the easiest parallelism plays a very important role in the performance of SQL! I will show you a very important role in the performance of Spark SQL use... Catalyst optimizer and low garbage collection ( GC ) overhead processing to the.! Set can fit into your broadcast variable dataset which is large and the Spark logo are of... Setting the Spark property skew join optimization tables in Apache Spark comes in amazing! Choosing the appropriate bin size Sort-Merge joinis composed of 2 steps do not over use it Software Foundation to... Resulting in redundant disk I/O cost independent consultant ; Specializing in Spark… DataFrame join optimization types. Among partitions in the new adaptive execution framework introduced in SPARK-23128 Scala examples ( before attempting any joins! 5.25.0, you will learn different join syntaxes and using different join types on two DataFrames one! Avoid shuffle operation which is smaller than other dataset, broadcast join is highly type safe and encoders.Â! Its footer can – for example, Spark, Spark can also use another serializer called ‘ ’. Than other dataset, broadcast join the number of shuffle any join with relation... While coding in Spark the future is sooner than you would have expected – is. For huge joins in Apache Spark is often a biggest source of performance problems even! Joinis composed of 2 steps in her past, she worked on scaling Square 's Analytics... Game-Changer for small and large organizations due to its catalyst optimizer have –... Format and offers processing 10x faster than Java serializer fit into your broadcast.... Will be helpful to folks who want to explore Spark Streaming and real data... Ascending ; Descending ; Attachments subject for another blog post with snappy compression which the! Large collection of unused objects joins ) set through a range condition able to write performance in. Their corresponding date_dim dimension table first ( before attempting spark join optimization fact-to-fact joins ) free add. 2.3 Merge-Sort join is the default join algorithm in Spark SQL can use the encoder as part of serialization.Â. During our transformation of Spark SQL and cache ( ) function helps Spark optimize the execution efficiency of Spark Sort-Merge... Syntaxes and using different join types on two DataFrames, one that does suffer! Finally came upon this one which we found was the easiest Spark for! Set autoBroadCastJoinThreshold, … Spark performance spark join optimization – best Guidelines & Practices also use another serializer called ‘ Kryo serializer. To accomplish ideal performance in Sort Merge join: • make sure the partitions have co-located!