如何计算Spark程序的执行速度
如何计算Spark程序的执行速度
由于懒惰,我想计时我的Spark程序的执行速度,但这很困难。让我们考虑一下这段(无意义的)代码:
var graph = GraphLoader.edgeListFile(context, args(0)) val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache /* 我需要在这里开始计时 */ val t1 = System.currentTimeMillis val edges = graph_degs.flatMap(trip => { /* 做一些事情 */ }) .union(graph_degs) val count = edges.count val t2 = System.currentTimeMillis /* 我需要在这里停止计时 */ println("计数 " + count + " 花费了 " + t2-t1)
问题是,转换操作是惰性的,所以在val count = edges.count
行之前没有进行任何评估。但根据我的观点,尽管上面的代码没有值,t1
仍然得到了一个值...尽管代码中的位置,t1
之前的代码在计时器开始后得到了评估。这是一个问题...
在Spark Web UI中,我找不到任何有关它的有趣信息,因为我需要在那行特定代码之后花费的时间。你认为是否有一个简单的解决方案可以查看一组转换何时真正评估?
Spark程序的执行速度是一个重要的指标,但是在Spark中,连续的转换操作(在同一个任务中)被视为一个整体,Spark无法对它们进行单独的测量。因此,我们需要找到一种方法来测量每个转换操作的执行时间。一种解决方法是使用累加器来累计每次转换操作的执行时间,并在最后统计总时间。下面是一个示例代码:
// 创建一个累加器 val durationAccumulator = sc.longAccumulator("flatMapDuration") // 在转换操作中添加时间测量,并将时间添加到累加器中 val edges = rdd.flatMap(trip => { val t1 = System.currentTimeMillis() val result = doSomething(trip) val t2 = System.currentTimeMillis() durationAccumulator.add(t2 - t1) result }) // 执行触发计算的操作 val count = edges.count() // 打印累加器的值 println("执行flatMap操作共花费了 " + durationAccumulator.value + " 毫秒,共计处理了 " + count + " 个记录")
你可以对每个单独的转换操作都重复这个过程。需要注意的是,累加器对于重试等情况是敏感的,因为重试的任务会更新累加器两次。
为了使代码更可重用,你可以创建一个名为`measure`的函数,它可以包装任何函数,并更新给定的累加器:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { val t1 = System.currentTimeMillis() val result = action(input) val t2 = System.currentTimeMillis() acc.add(t2 - t1) result } // 使用这个函数来测量任何转换操作 rdd.flatMap(measure(doSomething, durationAccumulator))
需要注意的是,这种方法无法包括Spark在洗牌和计数过程中花费的时间,如果想要获取这部分时间的信息,可以使用Spark UI。
另外,某些情况下了使用`spark.time(myFunction)`来测量执行时间的方法,但是这种方法只能在Driver上运行,无法用于转换分布式数据的函数。