Yesterday, I ran into a behavior of Spark’s DataFrameReader
when reading Parquet data that can be misleading.
If we have several parquet files in a parquet data directory having different schemas, and if we don’t provide any schema or if we don’t
use the option mergeSchema, the inferred schema depends on the order of the parquet files in the data directory.
The setup
I am reading data stored in Parquet format. The data are split in two parquet files, each having a different schema. The first parquet file containing the data below:
+---+-------+ |id |column1| +---+-------+ |1 |value11| +---+-------+
And the second one containing the data below:
+---+-------+-------+ |id |column1|column2| +---+-------+-------+ |2 |value12|value22| +---+-------+-------+
So I have my two files in a directory, as you can see here:
% ls -l target/test-data
total 8
-rw-r--r-- 1 vincent vincent 666 2020-10-25 15:06 part-00000-b6aaae26-9ca4-42fe-bef2-2063b454ec5f-c000.snappy.parquet
-rw-r--r-- 1 vincent vincent 911 2020-10-25 15:07 part-00000-d6d836de-0de8-424a-91cc-869ebd914cd2-c000.snappy.parquet
-rw-r--r-- 1 vincent vincent 0 2020-10-25 15:07 _SUCCESS
Loading parquet files
If I load parquet data with the plain simple spark read method, I get:
> spark.read.parquet("target/test-data").show(false)
+---+-------+
|id |column1|
+---+-------+
|2 |value12|
|1 |value11|
+---+-------+
So far so good.
Let’s do a little experiment: I change the listing order of the parquet files in directory /target/test-data
by changing their name:
% mv target/test-data/part-00000-b6aaae26-9ca4-42fe-bef2-2063b454ec5f-c000.snappy.parquet target/test-data/02-data.snappy.parquet
% mv target/test-data/part-00000-d6d836de-0de8-424a-91cc-869ebd914cd2-c000.snappy.parquet target/test-data/01-data.snappy.parquet
% ls -l target/test-data
total 8
-rw-r--r-- 1 vincent vincent 911 2020-10-25 15:07 01-data.snappy.parquet
-rw-r--r-- 1 vincent vincent 666 2020-10-25 15:06 02-data.snappy.parquet
-rw-r--r-- 1 vincent vincent 0 2020-10-25 15:07 _SUCCESS
As you can see, now the first parquet file is after the second parquet file, in alphanumeric order. When I read this directory, I get:
> spark.read.parquet("target/test-data").show(false)
+---+-------+-------+
|id |column1|column2|
+---+-------+-------+
|2 |value12|value22|
|1 |value11|null |
+---+-------+-------+
The inferred schema is not the same as the first read: by just changing the order of the files in the parquet directory, I manage to change the schema used to read the data contained in parquet.
Why files order matters ?
When calling the .parquet(path)
method on your DataFrameReader
, you call the following stack of methods in Spark 3.0:
DataFrameReader.load(path) DataSource.lookupDataSourceV2 DataSourceV2Utils.getTableFromProvider
This last method, if you didn’t provide the schema, meaning if you didn’t use the method .schema()
on your DataFrameReader
,
continues the calling stack as follows:
TableProvider.inferSchema FileTable.schema FileTable.dataSchema ParquetTable.inferSchema ParquetUtils.inferSchema
The inferSchema
method in object ParquetUtils
is the important method here. This method retrieves all files in path, then, if you didn’t set mergeSchema option, try
to retrieve schema from metadata files _common_metadata or _metadata and if it fails, retrieve schema from first data
file:
if (shouldMergeSchemas) {
...
} else {
filesByType.commonMetadata.headOption
.orElse(filesByType.metadata.headOption)
.orElse(filesByType.data.headOption)
.toSeq
}
As usually the files are listed in alphanumeric order, it means that by changing the names of parquet files in parquet data directory, you can change the schema inferred by Spark.
Conclusion
When infering schema on parquet data containing different schemas, as the inferred schema depends on how Spark is listing your parquet files, you can end up with inconsistencies when loading data, for instance columns disappearing as in our example.
So, except if you are sure the schema is the same for all parquet files, you should use option mergeSchema
set to true
while
reading parquet data to ensure that every time you read the data, you apply the same schema.
As this mergeSchema
operation is rather expensive,
if you know the input schema, for instance if you cast the read data to a case class with DataFrame
method .as[CaseClass]
,
you should pass the schema to DataFrameReader
with the .schema
method. You can easily extract schema from case class with the following code:
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[CaseClass].dataType.asInstanceOf[StructType]
spark.read.schema(schema).parquet(path)
Source code to generate and read the parquet data used in this post is available here: https://github.com/vincentdoba/blog-examples/tree/master/20201025-spark-read-parquet