Apache Spark:如何在代码中取消作业并终止运行中的任务?

8 浏览
0 Comments

Apache Spark:如何在代码中取消作业并终止运行中的任务?

我正在一个带有Yarn(版本2.6.0)的Hadoop集群上运行一个Spark应用程序(版本1.6.0),以客户端模式运行。我有一段代码运行一个长时间的计算,如果它运行时间太长,我想终止它(然后运行其他函数)。

这是一个例子:

val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// 设置一个无限循环的操作
val future = sc.parallelize(lst).map(while (true) _).collectAsync()
try {
    Await.result(future, Duration(30, TimeUnit.SECONDS))
    println("success!")
} catch {
    case _:Throwable =>
        future.cancel()
        println("timeout")
}
// 等待1小时以便检查Yarn中的应用程序
Thread.sleep(60*60*1000)
sc.stop()

超时设置为30秒,但是计算是无限的,所以等待结果时会抛出异常,然后捕获异常并取消未完成的计算,并执行备用函数。

这一切都非常完美地工作,除了取消的作业并没有完全终止:在应用程序的Web UI中查看时,作业标记为失败,但我可以看到仍然有正在运行的任务。

当我使用SparkContext.cancelAllJobs或SparkContext.cancelJobGroup时,情况也是一样。问题在于,即使我设法继续我的程序,被取消的作业的运行任务仍然占用着宝贵的资源(最终会使我几乎停止)。

总结一下:如何以一种终止所有运行任务的方式终止Spark作业?(与目前发生的情况相反,停止作业运行新的任务,但让当前正在运行的任务完成)

更新:

在长时间忽略这个问题后,我们找到了一个有点混乱但有效的解决方法。不是从Spark应用程序内部尝试终止适当的Spark Job/Stage,而是在超时发生时记录所有活动阶段的阶段ID,并向用于终止这些阶段的Spark Web UI呈现的URL发出HTTP GET请求。

0