Recently, I was wondering about Spark’s deserialization performance. Especially this question: when you have a nullable column in a dataframe, is it better to deserialize it to an option or to a nullable type ? Let’s answer this question in this blog post.

The benchmark

To answer this question, I define the following benchmark. I create simple input data, read it with three Spark applications that select a column, replace its null value with a default value, and write the result to parquet.

I test the performance of the three following Spark applications:

  • first application does not deserialize and performs transformations using spark built-in functions

  • second application deserializes to an Option and performs transformations with scala code

  • third application deserializes to a nullable type and performs transformations with scala code

Creating simple input data

I decided to generate a csv file, with 11 columns. The first column would be an id column, and the others 10 columns the data columns. In the data columns, I would put the same value as in the id column, except that every 100 rows, I didn’t set any value. The code to generate the file is:

import java.io.{BufferedWriter, File, FileWriter}

val file = new File("/tmp/bigfile.csv")
val bw = new BufferedWriter(new FileWriter(file))
for (i <- 1 to 50000000) {
  val stringToWrite = if (i % 100 == 0) {
    s"$i,,,,,,,,,,\n"
  } else {
    s"$i,$i,$i,$i,$i,$i,$i,$i,$i,$i,$i,$i\n"
  }
  bw.write(stringToWrite)
}
bw.close()

It generates a csv file of about 5 Go

Common code among the three Spark applications

All spark applications use a spark session with the same configuration:

import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder()
  .master("local[*]")
  .appName("delta-stream-read")
  .config("spark.ui.enabled", "false")
  .config("spark.driver.host", "localhost")
  .config("spark.sql.shuffle.partitions", "1")
  .config("spark.default.parallelism", "1")
  .getOrCreate()

Also, all Spark applications read the input data with the same method:

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.DataFrame

val schema: StructType = StructType(Seq(
  StructField("id", IntegerType),
  StructField("value1", IntegerType),
  StructField("value2", IntegerType),
  StructField("value3", IntegerType),
  StructField("value4", IntegerType),
  StructField("value5", IntegerType),
  StructField("value6", IntegerType),
  StructField("value7", IntegerType),
  StructField("value8", IntegerType),
  StructField("value9", IntegerType),
  StructField("value10", IntegerType)
))

def read(path: String): DataFrame = spark.read.format("csv")
  .schema(schema)
  .load(path)
  .select("value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", "value10")

First application : no deserialization

First application does not deserialize data and uses spark built-in functions to replace null value by default value:

for (i <- 1 to 20) {
  spark.time(
    read("/tmp/bigfile.csv")
      .na.fill(0, Seq("value5"))
      .select("value5")
      .write
      .mode("overwrite")
      .parquet("/tmp/notSerialized")
  )
}

Second application : deserialization to a nullable type

Second application deserializes columns to java.lang.Integer and uses scala code to replace null value by default value:

import spark.implicits._

val zero: Integer = Integer.valueOf(0)

for (i <- 1 to 20) {
  spark.time(
    read("/tmp/bigfile.csv")
      .as[(Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer)]
      .map(value => if (value._5 == null) zero else value._5)
      .write
      .mode("overwrite")
      .parquet("/tmp/serializedToInteger")
  )
}

Third application: deserialization to an Option

Third application deserializes columns to Option[Int] and uses scala code to replace None value by default value:

import spark.implicits._

for (i <- 1 to 20) {
  spark.time(
    read("/tmp/bigfile.csv")
      .as[(Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int], Option[Int])]
      .map(_._5.getOrElse(0))
      .write
      .mode("overwrite")
      .parquet("/tmp/serializedToOption")
  )
}

Run

I ran each application 20 times on my local machine using Spark version 3.0.1, and I get the following results. All the times in table below are in seconds

application min max average median

without deserialization

28.608

32.363

29.933

29.709

nullable deserialization

43.665

47.773

46.300

46.694

option deserialization

45.076

50.377

47.194

46.702

Analysis

Run times are very similar between deserialization to nullable type and deserialization to option, with deserialization to nullable type is slightly better than deserialization to option type. Application without deserialization is of course way faster than the two applications with deserialization.

Conclusion

Changing deserialization types from option to nullable types doesn’t significantly improve the performance of Spark application. The best way to improve performance is to not deserialize and use spark’s built-in functions to transform a dataframe.