Apache Spark dataframe lineage trimming via RDD and role of cache 通过RDD进行Apache Spark dataframe的血统修剪及缓存的作用
Apache Spark dataframe lineage trimming via RDD and role of cache 通过RDD进行Apache Spark dataframe的血统修剪及缓存的作用
有一个技巧可以用来精简Apache Spark dataframe的血统,特别是对于迭代计算来说:
def getCachedDataFrame(df: DataFrame): DataFrame = { val rdd = df.rdd.cache() df.sqlContext.createDataFrame(rdd, df.schema) }
这看起来有点像纯魔法,但现在我想知道为什么我们需要在RDD上调用cache()
方法?在这个血统精简逻辑中,使用缓存的目的是什么?
Apache Spark dataframe lineage trimming via RDD and role of cache
在理解缓存的目的之前,有必要了解不同类型的RDD操作:转换和动作。RDD支持两种类型的操作:转换操作和动作操作。转换操作是从现有数据集创建新数据集,而动作操作是在数据集上运行计算并将结果返回给驱动程序。Spark中的所有转换操作都是惰性的,即它们不会立即计算结果。相反,它们只会记住应用于某个基本数据集(例如文件)的转换操作。只有当动作操作需要将结果返回给驱动程序时,才会计算这些转换操作。这种设计使得Spark能够更高效地运行。例如,我们可以意识到通过map创建的数据集将在reduce中使用,并且只返回reduce的结果,而不是较大的映射数据集。
那么这与缓存有什么关系呢?
考虑以下代码:
// 读取一些数据
val df = spark.read.parquet("some_big_file.parquet")
// 对数据应用一些转换操作(这里是惰性操作)
val cleansedDF = df
.filter(filteringFunction)
.map(cleansingFunction)
// 执行一个动作操作,触发计算转换操作
cleansedDF.write.parquet("output_file.parquet")
// 再次执行一个动作操作,再次触发计算转换操作
println(s"You have ${cleansedDF.count} rows in the cleansed data")
在这里,我们读取了一些文件,对数据应用了一些转换操作,并对同一个数据框执行了两个动作操作:cleansedDF.write.parquet和cleansedDF.count。正如代码中的注释所解释的那样,如果我们像这样运行代码,实际上会计算这些转换操作两次。由于转换操作是惰性的,只有在动作操作需要执行它们时才会被执行。
那么我们如何避免这种重复计算呢?通过缓存:我们可以告诉Spark保留某些转换操作的结果,以便它们不必多次计算。这可以在磁盘/内存等位置进行缓存。因此,根据这个知识,我们的代码可以是这样的:
// 读取一些数据
val df = spark.read.parquet("some_big_file.parquet")
// 对数据应用一些转换操作(这里是惰性操作),并缓存该计算的结果
val cleansedDF = df
.filter(filteringFunction)
.map(cleansingFunction)
.cache
// 执行一个动作操作,触发计算转换操作并缓存结果
cleansedDF.write.parquet("output_file.parquet")
// 执行另一个动作操作,重用缓存数据
println(s"You have ${cleansedDF.count} rows in the cleansed data")
我已经调整了这个代码块中的注释,以突出与前一个代码块的区别。
注意,.persist也存在。使用.cache时,使用默认的存储级别,而使用.persist可以指定存储级别,如这个stackoverflow答案很好地解释了。
希望对您有所帮助!