Scala Spark在Dataframe上的迭代操作导致OoM和其他错误。
Scala Spark在Dataframe上的迭代操作导致OoM和其他错误。
我有一个包含20列和25条记录的数据框(小型标准数据,文件大小为7KB)。我需要在循环中对数据框执行多个操作。循环在几秒钟内正常工作,如预期的那样。问题是当循环结束后,我尝试显示(show())或将数据写入磁盘时,我的CPU高度占用了很多分钟(15-20分钟),内存使用率也很高。很多时候我会遇到stackoverflow(堆栈溢出)或outofmemory(内存不足)错误。
我的main()方法如下:
val spark = get_spark() val i_file = args(1) val df = spark.read .format("csv") .option("delimiter", ",") .option("header", true) .option("inferSchema","true") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") .load(i_file) var s_df = something.calculate(spark,df) /////////问题从这里开始/////////// s_df.repartition(col("cluster_id")) //即使不进行repartition也尝试过 .write.mode("overwrite") .option("header", "true").csv("C:\\Workspace\\data\\out.csv")
以及我的something中的calculate()方法:
def calculate(spark: SparkSession, df: DataFrame): DataFrame = { var newdf = init(spark, df) //对数据框进行一些初始化操作 val t_count = 100 var t_i = 0 newdf.rdd.cache() while (t_i < t_count) { if(某个条件){ newdf = calculate_type1(spark, newdf) }else{ newdf = calculate_type2(spark, newdf) } t_i = t_i + 1 if(t_i === 50){ newdf.rdd.checkpoint() } } return newdf }
我的分析:
- 注意到它可以在较少的循环中工作,例如t_count = 2,一切都正常。
- 我认为问题是Spark将图形保留在内存中,并尝试处理图形以生成最终的数据框。
- 我正在使用var,这是不正确的,不知怎么地我应该使用val,并使用leftfold或zip来更新原始的rdd。但是我在这方面遇到了困难。有人可以帮忙吗?非常感谢!
- 我需要使用checkpoint吗?我没有看到它有任何用途。