在从Spark集群收集数据时出现内存不足错误。

18 浏览
0 Comments

在从Spark集群收集数据时出现内存不足错误。

我知道在Spark上有很多关于内存不足错误的问题,但我没有找到解决方法。

我有一个简单的工作流程:

  1. 从Amazon S3读取ORC文件
  2. 筛选出一小部分行
  3. 选择一小部分列
  4. 收集到驱动节点(以便我可以在R中进行其他操作)

当我运行上述操作并将表缓存到Spark内存中时,它只占用不到2GB的空间,与我的集群可用内存相比微不足道,然后当我尝试将数据收集到驱动节点时,就会出现内存不足的错误。

我尝试在以下设置上运行:

  • 在拥有32个核心和244GB RAM的计算机上的本地模式
  • 独立模式下,使用10个6.2GB的执行器和61GB的驱动节点

对于每个设置,我尝试了许多executor.memorydriver.memorydriver.maxResultSize的配置,以覆盖我可用内存范围内的所有可能值,但总是在collect阶段出现内存不足错误;要么是java.lang.OutOfMemoryError: Java heap space

要么是java.lang.OutOfMemoryError : GC overhead limit exceeded,或者是

Error in invoke_method.spark_shell_connection(spark_connection(jobj),  : 
No status is returned.

(这是一个指示内存问题的sparklyr错误)。

根据我(有限的)对Spark的了解,将表缓存后再进行收集应该强制进行所有计算 - 也就是说,如果表在缓存之后愉快地保存在内存中,那么我在驱动节点上进行收集时不需要比2GB多多少内存。

请注意,回答这个问题中有一些建议我尚未尝试,但这些可能会影响性能(例如,序列化RDD),所以如果可能的话,我想避免使用它们。

我的问题:

  1. 一个在缓存后所占用的空间如此之小的数据框如何引起内存问题?
  2. 在我尝试其他可能会影响性能的选项之前,是否有明显的检查/更改/故障排除的方法来帮助解决问题?

谢谢

编辑:回应下面@Shaido的评论,通过Sparklyr调用cache会通过对表执行count(*)来“强制将数据加载到内存中”[来自Sparklyr文档] - 也就是说,表应该保存在内存中,并且所有计算(我相信)在调用collect之前都已经运行。

编辑:自从遵循下面的建议以来,有一些额外的观察结果:

  • 根据下面的评论,我现在尝试将数据写入CSV而不是收集,以了解可能的文件大小。这个操作创建了一组约为3GB的CSV文件,当在缓存之后运行后,只需要2秒钟。
  • 如果我将driver.maxResultSize设置为<1G,我会收到一个错误,指出序列化的RDD的大小为1030MB,大于driver.maxResultSize。
  • 如果我在调用collect后监视任务管理器中的内存使用情况,我会发现使用量一直上升,直到达到约90GB,然后出现内存不足错误。因此,不管原因如何,用于执行collect操作的RAM量约为正在尝试收集的RDD大小的100倍

编辑:如下所请求的评论中添加的代码。

#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)

0
0 Comments

问题的原因:当对Spark集群中的数据进行收集时,会出现内存不足的错误。这是因为在进行收集操作时,有两件事情会发生:首先,所有的数据都必须写入到driver节点的输出中;其次,driver节点必须从所有的节点中收集数据并保存在其内存中。

解决方法:如果只想将数据加载到executor节点的内存中,并可以供其他进程使用,可以使用count()方法,它也是一个会将数据加载到executor节点内存中的操作。如果想要提取数据,则可以尝试在提取数据时使用"--conf spark.driver.maxResultSize=10g"这个参数,同时结合其他属性进行设置。

感谢提供了关于调用collect方法时会发生的情况的信息。至于建议,我已经根据原问题中的描述实施了这两个方法。

0
0 Comments

问题出现的原因是因为在Spark集群中收集数据时出现了内存溢出错误。解决方法是将"collect"方法替换为其他操作,如"count"方法。

在上述内容中提到,"cache"不是一个操作,可以通过使用"persist()"或"cache()"方法将RDD标记为持久化。在首次执行操作时,RDD将保留在节点的内存中。但是,"collect"是一个操作,当调用"collect"时,所有的计算(包括"cache")都会开始执行。

你是在独立模式下运行应用程序,这意味着初始数据加载和所有计算都将在同一内存中进行。大部分内存用于数据下载和其他计算,而不是"collect"。

你可以通过将"collect"替换为"count"来验证这一点。原问题中提到的Sparklyr版本的"cache"会对表执行"count"操作。"cache"(即"count")运行良好,只有在紧接着调用"collect"时才会出现内存溢出错误。

问题的标题似乎是关于Spark,但实际上是关于单独的Sparklyr引擎,它进行了自己的"cache/collect"计算。这可能会引起困惑。

也许,"dataFrame.rdd.count"会导致内存溢出错误,而不是"collect"方法?

文章整理完成。

0