Yesterday, I ran into a behavior of Spark’s DataFrameReader when reading Parquet data that can be misleading. If we have several parquet files in a parquet data directory having different schemas, and if we don’t provide any schema or if we don’t use the option mergeSchema, the inferred schema depends on the order of the parquet files in the data directory.

The setup

I am reading data stored in Parquet format. The data are split in two parquet files, each having a different schema. The first parquet file containing the data below:

+---+-------+
|id |column1|
+---+-------+
|1  |value11|
+---+-------+

And the second one containing the data below:

+---+-------+-------+
|id |column1|column2|
+---+-------+-------+
|2  |value12|value22|
+---+-------+-------+

So I have my two files in a directory, as you can see here:

% ls -l target/test-data
total 8
-rw-r--r-- 1 vincent vincent 666 2020-10-25 15:06 part-00000-b6aaae26-9ca4-42fe-bef2-2063b454ec5f-c000.snappy.parquet
-rw-r--r-- 1 vincent vincent 911 2020-10-25 15:07 part-00000-d6d836de-0de8-424a-91cc-869ebd914cd2-c000.snappy.parquet
-rw-r--r-- 1 vincent vincent   0 2020-10-25 15:07 _SUCCESS

Loading parquet files

If I load parquet data with the plain simple spark read method, I get:

> spark.read.parquet("target/test-data").show(false)

+---+-------+
|id |column1|
+---+-------+
|2  |value12|
|1  |value11|
+---+-------+

So far so good.

Let’s do a little experiment: I change the listing order of the parquet files in directory /target/test-data by changing their name:

% mv target/test-data/part-00000-b6aaae26-9ca4-42fe-bef2-2063b454ec5f-c000.snappy.parquet target/test-data/02-data.snappy.parquet
% mv target/test-data/part-00000-d6d836de-0de8-424a-91cc-869ebd914cd2-c000.snappy.parquet target/test-data/01-data.snappy.parquet
% ls -l target/test-data
total 8
-rw-r--r-- 1 vincent vincent 911 2020-10-25 15:07 01-data.snappy.parquet
-rw-r--r-- 1 vincent vincent 666 2020-10-25 15:06 02-data.snappy.parquet
-rw-r--r-- 1 vincent vincent   0 2020-10-25 15:07 _SUCCESS

As you can see, now the first parquet file is after the second parquet file, in alphanumeric order. When I read this directory, I get:

> spark.read.parquet("target/test-data").show(false)

+---+-------+-------+
|id |column1|column2|
+---+-------+-------+
|2  |value12|value22|
|1  |value11|null   |
+---+-------+-------+

The inferred schema is not the same as the first read: by just changing the order of the files in the parquet directory, I manage to change the schema used to read the data contained in parquet.

Why files order matters ?

When calling the .parquet(path) method on your DataFrameReader, you call the following stack of methods in Spark 3.0:

DataFrameReader.load(path)
DataSource.lookupDataSourceV2
DataSourceV2Utils.getTableFromProvider

This last method, if you didn’t provide the schema, meaning if you didn’t use the method .schema() on your DataFrameReader, continues the calling stack as follows:

TableProvider.inferSchema
FileTable.schema
FileTable.dataSchema
ParquetTable.inferSchema
ParquetUtils.inferSchema

The inferSchema method in object ParquetUtils is the important method here. This method retrieves all files in path, then, if you didn’t set mergeSchema option, try to retrieve schema from metadata files _common_metadata or _metadata and if it fails, retrieve schema from first data file:

if (shouldMergeSchemas) {
  ...
} else {
    filesByType.commonMetadata.headOption
      .orElse(filesByType.metadata.headOption)
      .orElse(filesByType.data.headOption)
      .toSeq
}

As usually the files are listed in alphanumeric order, it means that by changing the names of parquet files in parquet data directory, you can change the schema inferred by Spark.

Conclusion

When infering schema on parquet data containing different schemas, as the inferred schema depends on how Spark is listing your parquet files, you can end up with inconsistencies when loading data, for instance columns disappearing as in our example.

So, except if you are sure the schema is the same for all parquet files, you should use option mergeSchema set to true while reading parquet data to ensure that every time you read the data, you apply the same schema.

As this mergeSchema operation is rather expensive, if you know the input schema, for instance if you cast the read data to a case class with DataFrame method .as[CaseClass], you should pass the schema to DataFrameReader with the .schema method. You can easily extract schema from case class with the following code:

import org.apache.spark.sql.catalyst.ScalaReflection

val schema = ScalaReflection.schemaFor[CaseClass].dataType.asInstanceOf[StructType]

spark.read.schema(schema).parquet(path)

Source code to generate and read the parquet data used in this post is available here: https://github.com/vincentdoba/blog-examples/tree/master/20201025-spark-read-parquet