在从Spark集群收集数据时出现内存不足错误。
在从Spark集群收集数据时出现内存不足错误。
我知道在Spark上有很多关于内存不足错误的问题,但我没有找到解决方法。
我有一个简单的工作流程:
- 从Amazon S3读取ORC文件
- 筛选出一小部分行
- 选择一小部分列
- 收集到驱动节点(以便我可以在R中进行其他操作)
当我运行上述操作并将表缓存到Spark内存中时,它只占用不到2GB的空间,与我的集群可用内存相比微不足道,然后当我尝试将数据收集到驱动节点时,就会出现内存不足的错误。
我尝试在以下设置上运行:
- 在拥有32个核心和244GB RAM的计算机上的本地模式
- 独立模式下,使用10个6.2GB的执行器和61GB的驱动节点
对于每个设置,我尝试了许多executor.memory
、driver.memory
和driver.maxResultSize
的配置,以覆盖我可用内存范围内的所有可能值,但总是在collect
阶段出现内存不足错误;要么是java.lang.OutOfMemoryError: Java heap space
,
要么是java.lang.OutOfMemoryError : GC overhead limit exceeded
,或者是
Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.
(这是一个指示内存问题的sparklyr
错误)。
根据我(有限的)对Spark的了解,将表缓存后再进行收集应该强制进行所有计算 - 也就是说,如果表在缓存之后愉快地保存在内存中,那么我在驱动节点上进行收集时不需要比2GB多多少内存。
请注意,回答这个问题中有一些建议我尚未尝试,但这些可能会影响性能(例如,序列化RDD),所以如果可能的话,我想避免使用它们。
我的问题:
- 一个在缓存后所占用的空间如此之小的数据框如何引起内存问题?
- 在我尝试其他可能会影响性能的选项之前,是否有明显的检查/更改/故障排除的方法来帮助解决问题?
谢谢
编辑:回应下面@Shaido的评论,通过Sparklyr调用cache
会通过对表执行count(*)
来“强制将数据加载到内存中”[来自Sparklyr文档] - 也就是说,表应该保存在内存中,并且所有计算(我相信)在调用collect
之前都已经运行。
编辑:自从遵循下面的建议以来,有一些额外的观察结果:
- 根据下面的评论,我现在尝试将数据写入CSV而不是收集,以了解可能的文件大小。这个操作创建了一组约为3GB的CSV文件,当在缓存之后运行后,只需要2秒钟。
- 如果我将
driver.maxResultSize
设置为<1G,我会收到一个错误,指出序列化的RDD的大小为1030MB,大于driver.maxResultSize。 - 如果我在调用
collect
后监视任务管理器中的内存使用情况,我会发现使用量一直上升,直到达到约90GB,然后出现内存不足错误。因此,不管原因如何,用于执行collect
操作的RAM量约为正在尝试收集的RDD大小的100倍。
编辑:如下所请求的评论中添加的代码。
#__________________________________________________________________________________________________________________________________ # Set parameters used for filtering rows #__________________________________________________________________________________________________________________________________ firstDate <- '2017-07-01' maxDate <- '2017-08-31' advertiserID <- '4529611' advertiserID2 <- '4601141' advertiserID3 <- '4601141' library(dplyr) library(stringr) library(sparklyr) #__________________________________________________________________________________________________________________________________ # Configure & connect to spark #__________________________________________________________________________________________________________________________________ Sys.setenv("SPARK_MEM"="100g") Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") config <- spark_config() config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3 Sys.setenv(AWS_ACCESS_KEY_ID="") Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions # Specify memory parameters - have tried lots of different values here! config$`sparklyr.shell.driver-memory` <- '50g' config$`sparklyr.shell.executor-memory` <- '50g' config$spark.driver.maxResultSize <- '50g' sc <- spark_connect(master='local', config=config, version='2.0.1') #__________________________________________________________________________________________________________________________________ # load data into spark from S3 ---- #__________________________________________________________________________________________________________________________________ #+++++++++++++++++++ # create spark table (not in memory yet) of all logfiles within logfiles path #+++++++++++++++++++ spark_session(sc) %>% invoke("read") %>% invoke("format", "orc") %>% invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% invoke("createOrReplaceTempView", "alldatadf") alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory #+++++++++++++++++++ # define variables used to filter table down to daterange #+++++++++++++++++++ # Calculate firstDate & maxDate as unix timestamps unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1 unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1 # Convert daterange params into date_year, date_month & date_day values to pass to filter statement dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1)) years <- unique(substring(dateRange, first=1, last=4)) if(length(years)==1) years <- c(years, years) year_y1 <- years[1]; year_y2 <- years[2] months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7) minMonth_y1 <- min(months_y1) maxMonth_y1 <- max(months_y1) months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7) minMonth_y2 <- min(months_y2) maxMonth_y2 <- max(months_y2) # Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time) firstDateMinusOne <- as.Date(firstDate)-1 firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4) firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10) maxDatePlusOne <- as.Date(maxDate)+1 maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4) maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7) maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10) #+++++++++++++++++++ # Read in data, filter & select #+++++++++++++++++++ # startTime <- proc.time()[3] dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory # filter by month and year, using ORC partitions for extra speed filter(((date_year==year_y1 & date_month>=minMonth_y1 & date_month<=maxMonth_y1) | (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) | (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) | (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>% # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind. filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>% # filter by advertiser ID filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & !is.na(advertiser_id)) | ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>% # Define cols to keep transmute(time=as.numeric(event_time/1000000), user_id=as.character(user_id), action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA), lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id), activity_lookup=as.character(activity_id), sv1=as.character(segment_value_1), other_data=as.character(other_data)) %>% mutate(time_char=as.character(from_unixtime(time))) # cache to memory dftbl <- sdf_register(dftbl, "filtereddf") tbl_cache(sc, "filtereddf") #__________________________________________________________________________________________________________________________________ # Collect out of spark #__________________________________________________________________________________________________________________________________ myDF <- collect(dftbl)
问题的原因:当对Spark集群中的数据进行收集时,会出现内存不足的错误。这是因为在进行收集操作时,有两件事情会发生:首先,所有的数据都必须写入到driver节点的输出中;其次,driver节点必须从所有的节点中收集数据并保存在其内存中。
解决方法:如果只想将数据加载到executor节点的内存中,并可以供其他进程使用,可以使用count()方法,它也是一个会将数据加载到executor节点内存中的操作。如果想要提取数据,则可以尝试在提取数据时使用"--conf spark.driver.maxResultSize=10g"这个参数,同时结合其他属性进行设置。
感谢提供了关于调用collect方法时会发生的情况的信息。至于建议,我已经根据原问题中的描述实施了这两个方法。
问题出现的原因是因为在Spark集群中收集数据时出现了内存溢出错误。解决方法是将"collect"方法替换为其他操作,如"count"方法。
在上述内容中提到,"cache"不是一个操作,可以通过使用"persist()"或"cache()"方法将RDD标记为持久化。在首次执行操作时,RDD将保留在节点的内存中。但是,"collect"是一个操作,当调用"collect"时,所有的计算(包括"cache")都会开始执行。
你是在独立模式下运行应用程序,这意味着初始数据加载和所有计算都将在同一内存中进行。大部分内存用于数据下载和其他计算,而不是"collect"。
你可以通过将"collect"替换为"count"来验证这一点。原问题中提到的Sparklyr版本的"cache"会对表执行"count"操作。"cache"(即"count")运行良好,只有在紧接着调用"collect"时才会出现内存溢出错误。
问题的标题似乎是关于Spark,但实际上是关于单独的Sparklyr引擎,它进行了自己的"cache/collect"计算。这可能会引起困惑。
也许,"dataFrame.rdd.count"会导致内存溢出错误,而不是"collect"方法?
文章整理完成。