Apache Spark dataframe lineage trimming via RDD and role of cache 通过RDD进行Apache Spark dataframe的血统修剪及缓存的作用

9 浏览
0 Comments

Apache Spark dataframe lineage trimming via RDD and role of cache 通过RDD进行Apache Spark dataframe的血统修剪及缓存的作用

有一个技巧可以用来精简Apache Spark dataframe的血统,特别是对于迭代计算来说:

def getCachedDataFrame(df: DataFrame): DataFrame = {
    val rdd = df.rdd.cache()
    df.sqlContext.createDataFrame(rdd, df.schema)
}

这看起来有点像纯魔法,但现在我想知道为什么我们需要在RDD上调用cache()方法?在这个血统精简逻辑中,使用缓存的目的是什么?

0
0 Comments

Apache Spark dataframe lineage trimming via RDD and role of cache

在理解缓存的目的之前,有必要了解不同类型的RDD操作:转换和动作。RDD支持两种类型的操作:转换操作和动作操作。转换操作是从现有数据集创建新数据集,而动作操作是在数据集上运行计算并将结果返回给驱动程序。Spark中的所有转换操作都是惰性的,即它们不会立即计算结果。相反,它们只会记住应用于某个基本数据集(例如文件)的转换操作。只有当动作操作需要将结果返回给驱动程序时,才会计算这些转换操作。这种设计使得Spark能够更高效地运行。例如,我们可以意识到通过map创建的数据集将在reduce中使用,并且只返回reduce的结果,而不是较大的映射数据集。

那么这与缓存有什么关系呢?

考虑以下代码:

// 读取一些数据

val df = spark.read.parquet("some_big_file.parquet")

// 对数据应用一些转换操作(这里是惰性操作)

val cleansedDF = df

.filter(filteringFunction)

.map(cleansingFunction)

// 执行一个动作操作,触发计算转换操作

cleansedDF.write.parquet("output_file.parquet")

// 再次执行一个动作操作,再次触发计算转换操作

println(s"You have ${cleansedDF.count} rows in the cleansed data")

在这里,我们读取了一些文件,对数据应用了一些转换操作,并对同一个数据框执行了两个动作操作:cleansedDF.write.parquet和cleansedDF.count。正如代码中的注释所解释的那样,如果我们像这样运行代码,实际上会计算这些转换操作两次。由于转换操作是惰性的,只有在动作操作需要执行它们时才会被执行。

那么我们如何避免这种重复计算呢?通过缓存:我们可以告诉Spark保留某些转换操作的结果,以便它们不必多次计算。这可以在磁盘/内存等位置进行缓存。因此,根据这个知识,我们的代码可以是这样的:

// 读取一些数据

val df = spark.read.parquet("some_big_file.parquet")

// 对数据应用一些转换操作(这里是惰性操作),并缓存该计算的结果

val cleansedDF = df

.filter(filteringFunction)

.map(cleansingFunction)

.cache

// 执行一个动作操作,触发计算转换操作并缓存结果

cleansedDF.write.parquet("output_file.parquet")

// 执行另一个动作操作,重用缓存数据

println(s"You have ${cleansedDF.count} rows in the cleansed data")

我已经调整了这个代码块中的注释,以突出与前一个代码块的区别。

注意,.persist也存在。使用.cache时,使用默认的存储级别,而使用.persist可以指定存储级别,如这个stackoverflow答案很好地解释了。

希望对您有所帮助!

0