With Spark 2.3.0 (Scala) I am getting non-deterministic results when calling distinct().count() on a DataFrame, both with and without caching it. The next lines give three different outputs (not increasing or decreasing but intermixed), and also different across different runs. Column "item" is of type Int.
df.cache()
println(df.select("item").distinct().count())
println(df.select("item").distinct().count())
println(df.select("item").distinct().count())
I have also tried checkpointing (setting a checkpoint directory first) but the same happens. The origin of the DF is not a database, but a static file (no changes) in my PC, gone through SparkSQL manipulations involving aggregations with row_number and random stuff. The RNG has been initialized (I hope correctly), but even in case there is some bug in the RNG initialization (I am doing a mapPartitions to generate the df), I would expect the counts to be identical because I have cached the DF. I have tried all types of caching (MEMORY_ONLY, DISK_ONLY, MEMORY_AND_DISK, ...). I have also tried caching the inner rdd (with df.rdd.cache()
) with no luck.
I cannot even fit a StringIndexer to the DF because, when I try
new StringIndexer(...).setInputCol("item").fit(df).transform(df)
I get an error in the transform()
due to unseen label, even though I am passing the same DF to fit and to transform (just immediately). It looks like the DF is being reconstructed (differently) every time I refer to it.
Strangely, if I do
println(df.select("item").collect().distinct.length)
println(df.select("item").collect().distinct.length)
println(df.select("item").collect().distinct.length)
just after cache, then I do get the same result.
Any ideas would be very welcome.
Thanks
Comments
Post a Comment