OutOfMemoryError: Java堆空间和内存变量在Spark中

9 浏览
0 Comments

OutOfMemoryError: Java堆空间和内存变量在Spark中

我一直在尝试执行一个scala程序,但输出似乎总是像这样的东西:

15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
at java.lang.AbstractStringBuilder.(AbstractStringBuilder.java:64)
at java.lang.StringBuilder.(StringBuilder.java:97)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344)
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

或者像这样

15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505)
    at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902)
    at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280)
    at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22)
    at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128)
    at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881)

这些错误是驱动程序还是执行程序的错误?

我对Spark使用的内存变量有些困惑。我当前的设置是

spark-env.sh

export SPARK_WORKER_MEMORY=6G
export SPARK_DRIVER_MEMORY=6G
export SPARK_EXECUTOR_MEMORY=4G

spark-defaults.conf

# spark.driver.memory              6G
# spark.executor.memory            4G
# spark.executor.extraJavaOptions  ' -Xms5G -Xmx5G '
# spark.driver.extraJavaOptions   ' -Xms5G -Xmx5G '

我需要取消spark-defaults.conf中的任何变量的注释吗,还是它们是多余的?

例如,设置SPARK_WORKER_MEMORY等同于设置spark.executor.memory吗?

我的scala代码的一部分,在几次迭代后停止:

   val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect
    for (id <- filteredNodesGroups){
        val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id)
        val pagerankGraph = clusterGraph.pageRank(0.15)
        val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) {
            case (uid, attrList, Some(pr)) => 
                attrList :+ ("inClusterPagerank:" + pr)
            case (uid, attrList, None) => 
                attrList :+ ""
        }
        val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1))
       println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1))
     }        

很多问题都隐含在一个问题中。提前感谢您的回答!

0
0 Comments

Java heap space和Spark中的内存变量是常见的问题,通常会导致OutOfMemoryError错误。这个问题的出现原因是由于在Spark代码中使用了collect方法,该方法会将所有数据从执行器返回到驱动程序,而不经过任何处理。这可能导致大量的数据被加载到驱动程序的内存中,从而导致内存不足的错误。

为了解决这个问题,可以采用更加函数式的方法来处理数据。可以使用groupBy和map等操作来提取唯一的值,并对数据进行处理。另外,可以使用分片技术对数据进行排序,并将处理后的结果返回给驱动程序。

在上述代码中,还存在另一个问题,即需要对两个RDD[Graph[Array[String], String]]进行连接操作,但是不清楚应该使用哪个键进行连接,并且这可能会返回一个RDD的RDD。需要进一步研究解决方案。

整个代码的目标是将图拆分为较小的子图,然后在每个子图上计算PageRank,并找出具有最大PageRank值的节点。大多数子图的大小在2-20个节点之间,但也有一个子图包含约13,000个节点和几千条边。然而,计算通常在达到该子图之前停止,导致内存开销过大。需要找到解决方案来解决这个问题。

以下是完整的代码示例:

def clusterElements(connCompGraph: Graph[Array[String], String], pagerankIndex: Int) = {
  val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct()
  val clusterGraphs = filteredNodesGroups.map { id => connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) }
  val pageRankGraphs = clusterGraphs.map(_.pageRank(0.15))
  // 继续处理结果...
}

通过使用更加函数式的方法处理数据,可以减少内存的使用,从而避免出现Java heap space错误。对于连接操作的问题,仍需要进一步研究找到解决方案。

0