Recently, I was wondering about Spark’s deserialization performance. Especially this question: when you have a nullable column in a dataframe, is it better to deserialize it to an option or to a nullable type ? Let’s answer this question in this blog post.
The benchmark
To answer this question, I define the following benchmark. I create simple input data, read it with three Spark applications that select a column, replace its null value with a default value, and write the result to parquet.
I test the performance of the three following Spark applications:
-
first application does not deserialize and performs transformations using spark built-in functions
-
second application deserializes to an Option and performs transformations with scala code
-
third application deserializes to a nullable type and performs transformations with scala code
Creating simple input data
I decided to generate a csv file, with 11 columns. The first column would be an id column, and the others 10 columns the data columns. In the data columns, I would put the same value as in the id column, except that every 100 rows, I didn’t set any value. The code to generate the file is:
import java.io.{BufferedWriter, File, FileWriter}
val file = new File("/tmp/bigfile.csv")
val bw = new BufferedWriter(new FileWriter(file))
for (i <- 1 to 50000000) {
val stringToWrite = if (i % 100 == 0) {
s"$i,,,,,,,,,,\n"
} else {
s"$i,$i,$i,$i,$i,$i,$i,$i,$i,$i,$i,$i\n"
}
bw.write(stringToWrite)
}
bw.close()
It generates a csv file of about 5 Go
Common code among the three Spark applications
All spark applications use a spark session with the same configuration:
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("delta-stream-read")
.config("spark.ui.enabled", "false")
.config("spark.driver.host", "localhost")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
.getOrCreate()
Also, all Spark applications read the input data with the same method:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.DataFrame
val schema: StructType = StructType(Seq(
StructField("id", IntegerType),
StructField("value1", IntegerType),
StructField("value2", IntegerType),
StructField("value3", IntegerType),
StructField("value4", IntegerType),
StructField("value5", IntegerType),
StructField("value6", IntegerType),
StructField("value7", IntegerType),
StructField("value8", IntegerType),
StructField("value9", IntegerType),
StructField("value10", IntegerType)
))
def read(path: String): DataFrame = spark.read.format("csv")
.schema(schema)
.load(path)
.select("value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", "value10")
First application : no deserialization
First application does not deserialize data and uses spark built-in functions to replace null value by default value:
for (i <- 1 to 20) {
spark.time(
read("/tmp/bigfile.csv")
.na.fill(0, Seq("value5"))
.select("value5")
.write
.mode("overwrite")
.parquet("/tmp/notSerialized")
)
}
Second application : deserialization to a nullable type
Second application deserializes columns to java.lang.Integer
and uses scala code to replace null value by default
value:
import spark.implicits._
val zero: Integer = Integer.valueOf(0)
for (i <- 1 to 20) {
spark.time(
read("/tmp/bigfile.csv")
.as[(Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer)]
.map(value => if (value._5 == null) zero else value._5)
.write
.mode("overwrite")
.parquet("/tmp/serializedToInteger")
)
}
Third application: deserialization to an Option
Third application deserializes columns to Option[Int]
and uses scala code to replace None
value by default value:
import spark.implicits._
for (i <- 1 to 20) {
spark.time(
read("/tmp/bigfile.csv")
.as[(Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int])]
.map(_._5.getOrElse(0))
.write
.mode("overwrite")
.parquet("/tmp/serializedToOption")
)
}
Run
I ran each application 20 times on my local machine using Spark version 3.0.1, and I get the following results. All the times in table below are in seconds
application | min | max | average | median |
---|---|---|---|---|
without deserialization |
28.608 |
32.363 |
29.933 |
29.709 |
nullable deserialization |
43.665 |
47.773 |
46.300 |
46.694 |
option deserialization |
45.076 |
50.377 |
47.194 |
46.702 |
Analysis
Run times are very similar between deserialization to nullable type and deserialization to option, with deserialization to nullable type is slightly better than deserialization to option type. Application without deserialization is of course way faster than the two applications with deserialization.
Conclusion
Changing deserialization types from option to nullable types doesn’t significantly improve the performance of Spark application. The best way to improve performance is to not deserialize and use spark’s built-in functions to transform a dataframe.
Source code used to do this benchmark is available here: https://github.com/vincentdoba/blog-examples/tree/master/20201112-deserialization-performance