Sometimes Spark "optimizes" a dataframe plan in an inefficient way. Consider the following example in Spark 2.1 (can also be reproduced in Spark 1.6):
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
In this example I want to write 1 file after an expensive transformation of a dataframe (this is just an example to demonstrate the issue). Spark moves the coalesce(1)
up such that the UDF is only applied to a dataframe containing 1 partition, thus destroying parallelism (interestingly repartition(1)
does not behave this way).
To generalize, this behavior occurs when I want to increase parallelism in a certain part of my transformation, but decrease parallelism thereafter.
I've found one workaround which consists of caching the dataframe and then triggering the complete evaluation of the dataframe:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
My question is: is there another way to tell Spark not to decrease parallelism in such cases?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…