Spark在按键分组时内存不足。
Spark在按键分组时内存不足。
我正试图使用在EC2上托管的Spark主机对Common Crawl数据进行简单的转换,使用这个指南,我的代码如下:
package ccminer import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object ccminer { val english = "english|en|eng" val spanish = "es|esp|spa|spanish|espanol" val turkish = "turkish|tr|tur|turc" val greek = "greek|el|ell" val italian = "italian|it|ita|italien" val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|") def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*") def main(args: Array[String]): Unit = { if (args.length != 3) { System.err.println("Bad command line") System.exit(-1) } val cluster = "spark://???" val sc = new SparkContext(cluster, "Common Crawl Miner", System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar")) sc.sequenceFile[String, String](args(0)).map { case (k, v) => (langIndep(k), v) } .groupByKey(args(2).toInt) .filter { case (_, vs) => vs.size > 1 } .saveAsTextFile(args(1)) } }
我用以下命令运行它:
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"
但很快就出现以下错误:
java.lang.OutOfMemoryError: Java heap space at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59) at com.ning.compress.lzf.ChunkEncoder.(ChunkEncoder.java:93) at com.ning.compress.lzf.impl.UnsafeChunkEncoder. (UnsafeChunkEncoder.java:40) at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE. (UnsafeChunkEncoderLE.java:13) at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31) at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44) at com.ning.compress.lzf.LZFOutputStream. (LZFOutputStream.java:61) at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60) at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803) at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471) at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
所以我的基本问题是,如何编写一个可以将键分组的Spark任务,可以处理几乎无限量的输入而不会耗尽内存?
Spark在进行groupByKey操作时出现内存溢出的问题。造成这个问题的原因是JVM分配的堆空间不足。虽然可以增加堆大小,但仍受系统能力的限制(不能超过物理内存的数量)。
另一方面,正如homutov所解释的那样,这种情况发生在大规模收集操作中。例如groupByKey、reduceByKey、cartisien + mapToPair等操作将RDD数据收集到一个地方,导致JVM的堆空间不足。
你可以怎么做呢?
根据我的经验,在集群/系统资源有限时,可以使用Spark调优指南。可以增加spark.default.parallelism,直到能够适应集群/系统中的任务(我曾经通过调整并行度,在我的笔记本虚拟机上运行了一个包含14000个实例和1024个特征的KNN实现)。
命令行标志:--conf spark.default.parallelism=4;4是并行度的值
记住,你需要调整这些特性,以获取最有效和避免失败(内存溢出)的设置,以获得Spark最佳的结果。
此外,
记住使用原始数据类型而不是包装类型。使用数组而不是集合。
例:Listvs int[];int[]比List更好
在Spark中,数组可以节省很多宝贵的空间并提高性能。
此外,使用广播变量替代笛卡尔积或任何大型组合任务。
关于何时、如何和为什么使用广播变量的良好介绍可以在以下链接中找到:jaceklaskowski.gitbooks.io/mastering-apache-spark/content/…
错误:spark.defualt.parallelism应为spark.default.parallelism。
Spark在进行groupByKey等shuffle任务时,最常见的java.lang.OutOfMemoryError异常的原因是并行度(parallelism)较低。可以通过设置spark.default.parallelism属性来增加默认并行度。
此外,将分区数设置得太少也会导致这个问题。在这个链接中提到了这一点。
需要注意的是,reduceByKey()方法似乎也会遭遇相同的错误。
因此,解决这个问题的方法是增加并行度,并确保分区数适当。以下是具体操作:
通过设置spark.default.parallelism属性来增加并行度的默认值。可以在配置文件中进行设置。
确保分区数适当,避免将分区数设置得太少。
以上是导致Spark在进行groupByKey任务时出现内存不足的原因以及解决方法。通过增加并行度和适当设置分区数,可以有效避免这个问题。