使用join时,Spark的迭代时间呈指数增长。
使用join时,Spark的迭代时间呈指数增长。
我对Spark还比较陌生,正在尝试使用迭代算法(期望最大化)来实现以马尔可夫模型表示的聚类。所以我需要进行迭代和连接操作。
我遇到的一个问题是,每次迭代的时间呈指数增长。
经过一些试验,我发现在进行迭代时,需要持久化要在下一次迭代中重用的RDD,否则每次迭代Spark都会创建一个执行计划,重新计算RDD,从而增加计算时间。
init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) # init.cache() print init.count() print str(datetime.datetime.now() - start)
结果为:
0 10000000 0:00:04.283652 1 10000000 0:00:05.998830 2 10000000 0:00:08.771984 3 10000000 0:00:11.399581 4 10000000 0:00:14.206069 5 10000000 0:00:16.856993
因此,添加cache()有所帮助,并且迭代时间变为恒定。
init = sc.parallelize(xrange(10000000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init = init2.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start) 0 10000000 0:00:04.966835 1 10000000 0:00:04.609885 2 10000000 0:00:04.324358 3 10000000 0:00:04.248709 4 10000000 0:00:04.218724 5 10000000 0:00:04.223368
但是,在迭代中进行连接操作时,问题又出现了。
下面是一些简单的代码来演示这个问题。即使在每个RDD转换上进行缓存也无法解决问题:
init = sc.parallelize(xrange(10000), 3) init.cache() for i in range(6): print i start = datetime.datetime.now() init2 = init.map(lambda n: (n, n*3)) init2.cache() init3 = init.map(lambda n: (n, n*2)) init3.cache() init4 = init2.join(init3) init4.count() init4.cache() init = init4.map(lambda n: n[0]) init.cache() print init.count() print str(datetime.datetime.now() - start)
以下是输出结果。如您所见,迭代时间呈指数增长:(
0 10000 0:00:00.674115 1 10000 0:00:00.833377 2 10000 0:00:01.525314 3 10000 0:00:04.194715 4 10000 0:00:08.139040 5 10000 0:00:17.852815
真的非常感谢任何帮助 🙂
Spark迭代时间使用join时呈指数增长的问题是由于没有指定分区数而导致的。分区数可以无限增长。解决该问题的方法有两种。
方法1:
如zero323所指出的,可以在调用join时手动指定分区数。例如:
rdd1.join(rdd2, numPartitions)
这将确保分区数不超过numPartitions,尤其是分区数不会不断增长。
方法2:
在创建SparkConf时,可以指定默认的并行程度。如果设置了该值,那么在调用join等函数时,如果没有指定numPartitions,将使用默认的并行程度,从而限制分区数并防止其增长。可以通过以下方式设置该参数:
conf=SparkConf.set("spark.default.parallelism", numPartitions) sc = SparkContex(conf=conf)
问题:使用join操作时,Spark迭代时间呈指数增长。
原因:问题的出现是由于缺乏对RDD血统长度的控制,每次迭代中分区的数量都会随着self-join操作而增加,导致呈指数增长的模式。要解决这个问题,可以控制每次迭代中分区的数量,或者使用全局工具如spark.default.parallelism。第一种方法提供了更多的控制,并且不会影响代码的其他部分。
解决方法:可以通过控制每次迭代中分区的数量或者在每次迭代中对RDD进行重新分区来解决这个问题。具体的解决方法如下:
1. 创建一个用于收集统计信息的辅助函数get_stats。
2. 创建一个用于处理缓存和分区的辅助函数procRDD。
3. 提取流水线逻辑的部分代码。
4. 创建初始数据。
5. 使用join操作时,如果没有提供numPartitions参数,根据输入RDD的分区数量调整输出的分区数量。
6. 使用哈希分区器来确保join操作的性能。
7. 将缓存作为参考进行测试。
8. 通过使用较大的分区数量和哈希分区器来检查是否可以提高性能。
9. 对于DataFrames和iterativeUnions操作,可以采用类似的方法进行解决。
这个问题的解决方法可以帮助Spark用户在使用join操作时避免迭代时间呈指数增长的问题,提高代码的性能和效率。