OutOfMemoryError: Java堆空间和内存变量在Spark中
OutOfMemoryError: Java堆空间和内存变量在Spark中
我一直在尝试执行一个scala程序,但输出似乎总是像这样的东西:
15/08/17 14:13:14 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext java.lang.OutOfMemoryError: Java heap space at java.lang.AbstractStringBuilder.(AbstractStringBuilder.java:64) at java.lang.StringBuilder. (StringBuilder.java:97) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:339) at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:83) at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2344) at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32) at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:169) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1215) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
或者像这样
15/08/19 11:45:11 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext java.lang.OutOfMemoryError: GC overhead limit exceeded at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:526) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider$Impl.createInstance(DefaultSerializerProvider.java:505) at com.fasterxml.jackson.databind.ObjectMapper._serializerProvider(ObjectMapper.java:2846) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:17) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:1902) at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:280) at com.fasterxml.jackson.core.JsonGenerator.writeObjectField(JsonGenerator.java:1255) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:22) at org.json4s.jackson.JValueSerializer.serialize(JValueSerializer.scala:7) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:128) at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:2881)
这些错误是驱动程序还是执行程序的错误?
我对Spark使用的内存变量有些困惑。我当前的设置是
spark-env.sh
export SPARK_WORKER_MEMORY=6G export SPARK_DRIVER_MEMORY=6G export SPARK_EXECUTOR_MEMORY=4G
spark-defaults.conf
# spark.driver.memory 6G # spark.executor.memory 4G # spark.executor.extraJavaOptions ' -Xms5G -Xmx5G ' # spark.driver.extraJavaOptions ' -Xms5G -Xmx5G '
我需要取消spark-defaults.conf中的任何变量的注释吗,还是它们是多余的?
例如,设置SPARK_WORKER_MEMORY
等同于设置spark.executor.memory
吗?
我的scala代码的一部分,在几次迭代后停止:
val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct.collect for (id <- filteredNodesGroups){ val clusterGraph = connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) val pagerankGraph = clusterGraph.pageRank(0.15) val completeClusterPagerankGraph = clusterGraph.outerJoinVertices(pagerankGraph.vertices) { case (uid, attrList, Some(pr)) => attrList :+ ("inClusterPagerank:" + pr) case (uid, attrList, None) => attrList :+ "" } val sortedClusterNodes = completeClusterPagerankGraph.vertices.toArray.sortBy(_._2(pagerankIndex + 1)) println(sortedClusterNodes(0)._2(1) + " with rank: " + sortedClusterNodes(0)._2(pagerankIndex + 1)) }
很多问题都隐含在一个问题中。提前感谢您的回答!
Java heap space和Spark中的内存变量是常见的问题,通常会导致OutOfMemoryError错误。这个问题的出现原因是由于在Spark代码中使用了collect方法,该方法会将所有数据从执行器返回到驱动程序,而不经过任何处理。这可能导致大量的数据被加载到驱动程序的内存中,从而导致内存不足的错误。
为了解决这个问题,可以采用更加函数式的方法来处理数据。可以使用groupBy和map等操作来提取唯一的值,并对数据进行处理。另外,可以使用分片技术对数据进行排序,并将处理后的结果返回给驱动程序。
在上述代码中,还存在另一个问题,即需要对两个RDD[Graph[Array[String], String]]进行连接操作,但是不清楚应该使用哪个键进行连接,并且这可能会返回一个RDD的RDD。需要进一步研究解决方案。
整个代码的目标是将图拆分为较小的子图,然后在每个子图上计算PageRank,并找出具有最大PageRank值的节点。大多数子图的大小在2-20个节点之间,但也有一个子图包含约13,000个节点和几千条边。然而,计算通常在达到该子图之前停止,导致内存开销过大。需要找到解决方案来解决这个问题。
以下是完整的代码示例:
def clusterElements(connCompGraph: Graph[Array[String], String], pagerankIndex: Int) = { val filteredNodesGroups = connCompGraph.vertices.map{ case(_, array) => array(pagerankIndex) }.distinct() val clusterGraphs = filteredNodesGroups.map { id => connCompGraph.subgraph(vpred = (_, attr) => attr(pagerankIndex) == id) } val pageRankGraphs = clusterGraphs.map(_.pageRank(0.15)) // 继续处理结果... }
通过使用更加函数式的方法处理数据,可以减少内存的使用,从而避免出现Java heap space错误。对于连接操作的问题,仍需要进一步研究找到解决方案。