Apache Spark:如何在代码中取消作业并终止运行中的任务?
Apache Spark:如何在代码中取消作业并终止运行中的任务?
我正在一个带有Yarn(版本2.6.0)的Hadoop集群上运行一个Spark应用程序(版本1.6.0),以客户端模式运行。我有一段代码运行一个长时间的计算,如果它运行时间太长,我想终止它(然后运行其他函数)。
这是一个例子:
val conf = new SparkConf().setAppName("TIMEOUT_TEST") val sc = new SparkContext(conf) val lst = List(1,2,3) // 设置一个无限循环的操作 val future = sc.parallelize(lst).map(while (true) _).collectAsync() try { Await.result(future, Duration(30, TimeUnit.SECONDS)) println("success!") } catch { case _:Throwable => future.cancel() println("timeout") } // 等待1小时以便检查Yarn中的应用程序 Thread.sleep(60*60*1000) sc.stop()
超时设置为30秒,但是计算是无限的,所以等待结果时会抛出异常,然后捕获异常并取消未完成的计算,并执行备用函数。
这一切都非常完美地工作,除了取消的作业并没有完全终止:在应用程序的Web UI中查看时,作业标记为失败,但我可以看到仍然有正在运行的任务。
当我使用SparkContext.cancelAllJobs或SparkContext.cancelJobGroup时,情况也是一样。问题在于,即使我设法继续我的程序,被取消的作业的运行任务仍然占用着宝贵的资源(最终会使我几乎停止)。
总结一下:如何以一种终止所有运行任务的方式终止Spark作业?(与目前发生的情况相反,停止作业运行新的任务,但让当前正在运行的任务完成)
更新:
在长时间忽略这个问题后,我们找到了一个有点混乱但有效的解决方法。不是从Spark应用程序内部尝试终止适当的Spark Job/Stage,而是在超时发生时记录所有活动阶段的阶段ID,并向用于终止这些阶段的Spark Web UI呈现的URL发出HTTP GET请求。