Scala Spark在Dataframe上的迭代操作导致OoM和其他错误。

8 浏览
0 Comments

Scala Spark在Dataframe上的迭代操作导致OoM和其他错误。

我有一个包含20列和25条记录的数据框(小型标准数据,文件大小为7KB)。我需要在循环中对数据框执行多个操作。循环在几秒钟内正常工作,如预期的那样。问题是当循环结束后,我尝试显示(show())或将数据写入磁盘时,我的CPU高度占用了很多分钟(15-20分钟),内存使用率也很高。很多时候我会遇到stackoverflow(堆栈溢出)或outofmemory(内存不足)错误。

我的main()方法如下:

val spark = get_spark()
val i_file = args(1)
val df = spark.read
  .format("csv")
  .option("delimiter", ",")
  .option("header", true)
  .option("inferSchema","true")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .load(i_file)
var s_df = something.calculate(spark,df)
/////////问题从这里开始///////////
s_df.repartition(col("cluster_id"))    //即使不进行repartition也尝试过
  .write.mode("overwrite")
  .option("header", "true").csv("C:\\Workspace\\data\\out.csv")    

以及我的something中的calculate()方法:

def calculate(spark: SparkSession, df: DataFrame): DataFrame = {
  var newdf = init(spark, df) //对数据框进行一些初始化操作
  val t_count = 100
  var t_i = 0
  newdf.rdd.cache()
  while (t_i < t_count) {
    if(某个条件){
      newdf = calculate_type1(spark, newdf)
    }else{
      newdf = calculate_type2(spark, newdf)
    }
    t_i = t_i + 1
    if(t_i === 50){
      newdf.rdd.checkpoint()
    }
  }
  return newdf
}

我的分析:

  1. 注意到它可以在较少的循环中工作,例如t_count = 2,一切都正常。
  2. 我认为问题是Spark将图形保留在内存中,并尝试处理图形以生成最终的数据框。
  3. 我正在使用var,这是不正确的,不知怎么地我应该使用val,并使用leftfold或zip来更新原始的rdd。但是我在这方面遇到了困难。有人可以帮忙吗?非常感谢!
  4. 我需要使用checkpoint吗?我没有看到它有任何用途。
0