User-defined aggregated functions are a powerful tool in Spark: you can avoid a lot of useless computation by crafting aggregated functions that does exactly what you want. However, sometimes their behavior can be surprising. For instance, be careful when using a custom aggregator over a windows ordered by a column that contains duplicate values: buffer is not flushed at each line but only when the value in ordering column changes.

Setup

Imagine we want to label each line of a dataframe with its line number according to an ordering. So for the following input dataframe:

+----------+
|date      |
+----------+
|2020-12-01|
|2020-12-02|
|2020-12-03|
|2020-12-04|
+----------+

We want to get the following result:

+----------+-----------+
|date      |line_number|
+----------+-----------+
|2020-12-01|1          |
|2020-12-02|2          |
|2020-12-03|3          |
|2020-12-04|4          |
+----------+-----------+

To do so, we can use the function row_number. But for the sake of the example, we create our own Aggregator instead:

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.{Aggregator}

object LineNumber extends Aggregator[Int, Int, Int] {
  override def zero: Int = 0

  override def reduce(b: Int, a: Int): Int = b + 1

  override def merge(b1: Int, b2: Int): Int = b1 + b2

  override def finish(reduction: Int): Int = reduction

  override def bufferEncoder: Encoder[Int] = Encoders.scalaInt

  override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

And we apply this aggregator on a window to get the line number:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.udaf

val line_number = udaf(LineNumber)
dataframe.withColumn("line_number", line_number().over(Window.orderBy("date")))

Our LineNumber aggregator is initialized with a buffer set to 0 using zero method, then for each row in the input dataframe, the reduce method is applied and then the finish method to populate line_number column. So we have the following execution:

row apply reduce apply finish line_number value

2020-12-01

reduce(0, 1) = 0 + 1 ⇒ 1

finish(1) ⇒ 1

1

2020-12-02

reduce(1, 1) = 1 + 1 ⇒ 2

finish(2) ⇒ 2

2

2020-12-03

reduce(2, 1) = 2 + 1 ⇒ 3

finish(3) ⇒ 3

3

2020-12-04

reduce(3, 1) = 3 + 1 ⇒ 4

finish(4) ⇒ 4

4

So everything is fine, and we get the line number for each row

If we have duplicates on ordering column

But what happens if some values in the ordering column are not unique ? For instance if we have the following dataframe as input:

+----------+
|date      |
+----------+
|2020-12-01|
|2020-12-01|
|2020-12-02|
|2020-12-02|
+----------+

If we run our aggregator on this dataframe, we obtain the following result:

+----------+-----------+
|date      |line_number|
+----------+-----------+
|2020-12-01|2          |
|2020-12-01|2          |
|2020-12-02|4          |
|2020-12-02|4          |
+----------+-----------+

What happens ?

What triggers the call to finish method is actually a change on value of the ordering column. As long as the value in ordering column does not change, only the reduce function is called. Once the value of the next line is different, the finish method is called, and the result is added to all previous lines having the same value. So we have the following execution:

row apply reduce apply finish line_number value

2020-12-01

reduce(0, 1) = 0 + 1 ⇒ 1

not applied

2

2020-12-01

reduce(1, 1) = 1 + 1 ⇒ 2

finish(2) ⇒ 2

2

2020-12-02

reduce(2, 1) = 2 + 1 ⇒ 3

not applied

4

2020-12-02

reduce(3, 1) = 3 + 1 ⇒ 4

finish(4) ⇒ 4

4

Conclusion

I don’t currently know a method to prevent this behavior, except ensuring that values in columns used for ordering are unique. You can for instance add others columns to ordering or create a new column with random values to do that.

So when you create a custom aggregator that you want to run over a window, keep this behavior in your mind to avoid surprises.