在Apache Spark中,使用大型RDD [MatrixEntry]时出现了超过了GC限制的情况。
在Apache Spark中,使用大型RDD [MatrixEntry]时出现了超过了GC限制的情况。
我有一个存储了用户-项目数据的csv文件,维度为6,365x214
,我正在使用org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
的columnSimilarities()
方法来计算用户之间的相似度。\n我的代码如下:\n
import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.distributed.{RowMatrix, MatrixEntry, CoordinateMatrix} import org.apache.spark.rdd.RDD def rddToCoordinateMatrix(input_rdd: RDD[String]) : CoordinateMatrix = { // 将RDD[String]转换为RDD[Tuple3] val coo_matrix_input: RDD[Tuple3[Long,Long,Double]] = input_rdd.map( line => line.split(',').toList ).map{ e => (e(0).toLong, e(1).toLong, e(2).toDouble) } // 将RDD[Tuple3]转换为RDD[MatrixEntry] val coo_matrix_matrixEntry: RDD[MatrixEntry] = coo_matrix_input.map(e => MatrixEntry(e._1, e._2, e._3)) // 将RDD[MatrixEntry]转换为CoordinateMatrix val coo_matrix: CoordinateMatrix = new CoordinateMatrix(coo_matrix_matrixEntry) return coo_matrix } // 读取CSV文件为RDD[String] val input_rdd: RDD[String] = sc.textFile("user_item.csv") // 将RDD[String]转换为CoordinateMatrix val coo_matrix = rddToCoordinateMatrix(input_rdd) // 转置CoordinateMatrix val coo_matrix_trans = coo_matrix.transpose() // 将CoordinateMatrix转换为RowMatrix val mat: RowMatrix = coo_matrix_trans.toRowMatrix() // 使用暴力方法完美计算相似的列 // 返回CoordinateMatrix val simsPerfect: CoordinateMatrix = mat.columnSimilarities() // 将CoordinateMatrix转换为RDD[MatrixEntry] val simsPerfect_entries = simsPerfect.entries simsPerfect_entries.count() // 将结果写入文件 val results_rdd = simsPerfect_entries.map(line => line.i+","+line.j+","+line.value) results_rdd.saveAsTextFile("similarity-output") // 关闭REPL终端 System.exit(0)
\n但是,当我在spark-shell上运行此脚本时,运行simsPerfect_entries.count()
代码行后出现以下错误:\n
java.lang.OutOfMemoryError: GC overhead limit exceeded
\n
更新:
\n我尝试了其他人提供的许多解决方案,但没有成功。\n1. 通过增加每个执行器进程可使用的内存量 spark.executor.memory=1g
\n2. 通过减少驱动程序进程的使用核心数 spark.driver.cores=1
\n请给我一些解决此问题的方法。
Apache Spark中出现的"GC overhead limit exceeded"错误,是因为JVM的垃圾回收器活动过于频繁,导致代码执行被停止。垃圾回收器活动过于频繁的原因可能是:
1. 产生了大量的小对象,然后立即丢弃这些对象。但是根据问题描述,这不是造成问题的原因。
2. 数据量超出了JVM堆内存的限制。比如,尝试将2GB的文本文件加载到只有1GB JVM堆内存的情况下。根据问题描述,这是造成问题的原因。
解决这个问题的方法是增加JVM堆内存的大小,可以在以下位置进行设置:
1. 如果你的Spark设置是分布式的,可以增加worker节点上的JVM堆内存。
2. 如果你使用的是spark-shell应用程序,可以通过添加额外的标记来增加driver-memory的大小。默认情况下,driver-memory是1g。可以将其增加到4g。示例如下:
$ spark-shell --driver-memory 4g
当你遇到“你产生了太多的小对象并且立即丢弃它们”的情况时,最好先进行测量(因为这是一个非常低的概率)。你可以创建一个单独的问题,描述你的情况,并提供示例和所有线索,以便更好地分析和解决这个问题。