在Apache Spark中,使用大型RDD [MatrixEntry]时出现了超过了GC限制的情况。

12 浏览
0 Comments

在Apache Spark中,使用大型RDD [MatrixEntry]时出现了超过了GC限制的情况。

我有一个存储了用户-项目数据的csv文件,维度为6,365x214,我正在使用org.apache.spark.mllib.linalg.distributed.CoordinateMatrixcolumnSimilarities()方法来计算用户之间的相似度。\n我的代码如下:\n

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, MatrixEntry, CoordinateMatrix}
import org.apache.spark.rdd.RDD
def rddToCoordinateMatrix(input_rdd: RDD[String]) : CoordinateMatrix = {
    // 将RDD[String]转换为RDD[Tuple3]
    val coo_matrix_input: RDD[Tuple3[Long,Long,Double]] = input_rdd.map(
        line => line.split(',').toList
    ).map{
        e => (e(0).toLong, e(1).toLong, e(2).toDouble)
    }
    // 将RDD[Tuple3]转换为RDD[MatrixEntry]
    val coo_matrix_matrixEntry: RDD[MatrixEntry] = coo_matrix_input.map(e => MatrixEntry(e._1, e._2, e._3))
    // 将RDD[MatrixEntry]转换为CoordinateMatrix
    val coo_matrix: CoordinateMatrix = new CoordinateMatrix(coo_matrix_matrixEntry)
    return coo_matrix
}
// 读取CSV文件为RDD[String]
val input_rdd: RDD[String] = sc.textFile("user_item.csv")
// 将RDD[String]转换为CoordinateMatrix
val coo_matrix = rddToCoordinateMatrix(input_rdd)
// 转置CoordinateMatrix
val coo_matrix_trans = coo_matrix.transpose()
// 将CoordinateMatrix转换为RowMatrix
val mat: RowMatrix = coo_matrix_trans.toRowMatrix()
// 使用暴力方法完美计算相似的列
// 返回CoordinateMatrix
val simsPerfect: CoordinateMatrix = mat.columnSimilarities()
// 将CoordinateMatrix转换为RDD[MatrixEntry]
val simsPerfect_entries = simsPerfect.entries
simsPerfect_entries.count()
// 将结果写入文件
val results_rdd = simsPerfect_entries.map(line => line.i+","+line.j+","+line.value)
results_rdd.saveAsTextFile("similarity-output")
// 关闭REPL终端
System.exit(0)

\n但是,当我在spark-shell上运行此脚本时,运行simsPerfect_entries.count()代码行后出现以下错误:\n

java.lang.OutOfMemoryError: GC overhead limit exceeded

\n

更新:

\n我尝试了其他人提供的许多解决方案,但没有成功。\n1. 通过增加每个执行器进程可使用的内存量 spark.executor.memory=1g\n2. 通过减少驱动程序进程的使用核心数 spark.driver.cores=1\n请给我一些解决此问题的方法。

0
0 Comments

Apache Spark中出现的"GC overhead limit exceeded"错误,是因为JVM的垃圾回收器活动过于频繁,导致代码执行被停止。垃圾回收器活动过于频繁的原因可能是:

1. 产生了大量的小对象,然后立即丢弃这些对象。但是根据问题描述,这不是造成问题的原因。

2. 数据量超出了JVM堆内存的限制。比如,尝试将2GB的文本文件加载到只有1GB JVM堆内存的情况下。根据问题描述,这是造成问题的原因。

解决这个问题的方法是增加JVM堆内存的大小,可以在以下位置进行设置:

1. 如果你的Spark设置是分布式的,可以增加worker节点上的JVM堆内存。

2. 如果你使用的是spark-shell应用程序,可以通过添加额外的标记来增加driver-memory的大小。默认情况下,driver-memory是1g。可以将其增加到4g。示例如下:

$ spark-shell --driver-memory 4g

当你遇到“你产生了太多的小对象并且立即丢弃它们”的情况时,最好先进行测量(因为这是一个非常低的概率)。你可以创建一个单独的问题,描述你的情况,并提供示例和所有线索,以便更好地分析和解决这个问题。

0