Donate. I desperately need donations to survive due to my health

Get paid by answering surveys Click here

Click here to donate

Remote/Work from Home jobs

Spark distinct count after caching inconsistent result

With Spark 2.3.0 (Scala) I am getting non-deterministic results when calling distinct().count() on a DataFrame, both with and without caching it. The next lines give three different outputs (not increasing or decreasing but intermixed), and also different across different runs. Column "item" is of type Int.

df.cache()

println(df.select("item").distinct().count())
println(df.select("item").distinct().count())
println(df.select("item").distinct().count())

I have also tried checkpointing (setting a checkpoint directory first) but the same happens. The origin of the DF is not a database, but a static file (no changes) in my PC, gone through SparkSQL manipulations involving aggregations with row_number and random stuff. The RNG has been initialized (I hope correctly), but even in case there is some bug in the RNG initialization (I am doing a mapPartitions to generate the df), I would expect the counts to be identical because I have cached the DF. I have tried all types of caching (MEMORY_ONLY, DISK_ONLY, MEMORY_AND_DISK, ...). I have also tried caching the inner rdd (with df.rdd.cache()) with no luck.

I cannot even fit a StringIndexer to the DF because, when I try

new StringIndexer(...).setInputCol("item").fit(df).transform(df)

I get an error in the transform() due to unseen label, even though I am passing the same DF to fit and to transform (just immediately). It looks like the DF is being reconstructed (differently) every time I refer to it.

Strangely, if I do

println(df.select("item").collect().distinct.length)
println(df.select("item").collect().distinct.length)
println(df.select("item").collect().distinct.length)

just after cache, then I do get the same result.

Any ideas would be very welcome.

Thanks

Comments