User-defined aggregated functions are a powerful tool in Spark: you can avoid a lot of useless computation by crafting aggregated functions that does exactly what you want. However, sometimes their behavior can be surprising. For instance, be careful when using a custom aggregator over a windows ordered by a column that contains duplicate values: buffer is not flushed at each line but only when the value in ordering column changes.
Setup
Imagine we want to label each line of a dataframe with its line number according to an ordering. So for the following input dataframe:
+----------+ |date | +----------+ |2020-12-01| |2020-12-02| |2020-12-03| |2020-12-04| +----------+
We want to get the following result:
+----------+-----------+ |date |line_number| +----------+-----------+ |2020-12-01|1 | |2020-12-02|2 | |2020-12-03|3 | |2020-12-04|4 | +----------+-----------+
To do so, we can use the function row_number. But for the sake of the example, we create our own Aggregator instead:
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.{Aggregator}
object LineNumber extends Aggregator[Int, Int, Int] {
override def zero: Int = 0
override def reduce(b: Int, a: Int): Int = b + 1
override def merge(b1: Int, b2: Int): Int = b1 + b2
override def finish(reduction: Int): Int = reduction
override def bufferEncoder: Encoder[Int] = Encoders.scalaInt
override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
And we apply this aggregator on a window to get the line number:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.udaf
val line_number = udaf(LineNumber)
dataframe.withColumn("line_number", line_number().over(Window.orderBy("date")))
Our LineNumber
aggregator is initialized with a buffer set to 0
using zero
method, then for each row in the input
dataframe, the reduce
method is applied and then the finish
method to populate line_number
column. So we have the
following execution:
row | apply reduce | apply finish | line_number value |
---|---|---|---|
2020-12-01 |
reduce(0, 1) = 0 + 1 ⇒ 1 |
finish(1) ⇒ 1 |
1 |
2020-12-02 |
reduce(1, 1) = 1 + 1 ⇒ 2 |
finish(2) ⇒ 2 |
2 |
2020-12-03 |
reduce(2, 1) = 2 + 1 ⇒ 3 |
finish(3) ⇒ 3 |
3 |
2020-12-04 |
reduce(3, 1) = 3 + 1 ⇒ 4 |
finish(4) ⇒ 4 |
4 |
So everything is fine, and we get the line number for each row
If we have duplicates on ordering column
But what happens if some values in the ordering column are not unique ? For instance if we have the following dataframe as input:
+----------+ |date | +----------+ |2020-12-01| |2020-12-01| |2020-12-02| |2020-12-02| +----------+
If we run our aggregator on this dataframe, we obtain the following result:
+----------+-----------+ |date |line_number| +----------+-----------+ |2020-12-01|2 | |2020-12-01|2 | |2020-12-02|4 | |2020-12-02|4 | +----------+-----------+
What happens ?
What triggers the call to finish
method is actually a change on value of the ordering column. As long as the value in
ordering column does not change, only the reduce
function is called. Once the value of the next line is different, the
finish
method is called, and the result is added to all previous lines having the same value. So we have the following
execution:
row | apply reduce | apply finish | line_number value |
---|---|---|---|
2020-12-01 |
reduce(0, 1) = 0 + 1 ⇒ 1 |
not applied |
2 |
2020-12-01 |
reduce(1, 1) = 1 + 1 ⇒ 2 |
finish(2) ⇒ 2 |
2 |
2020-12-02 |
reduce(2, 1) = 2 + 1 ⇒ 3 |
not applied |
4 |
2020-12-02 |
reduce(3, 1) = 3 + 1 ⇒ 4 |
finish(4) ⇒ 4 |
4 |
Conclusion
I don’t currently know a method to prevent this behavior, except ensuring that values in columns used for ordering are unique. You can for instance add others columns to ordering or create a new column with random values to do that.
So when you create a custom aggregator that you want to run over a window, keep this behavior in your mind to avoid surprises.
Complete source code is available here: https://github.com/vincentdoba/blog-examples/tree/master/20201206-spark-aggregator-window-duplicates