使用join时,Spark的迭代时间呈指数增长。

10 浏览
0 Comments

使用join时,Spark的迭代时间呈指数增长。

我对Spark还比较陌生,正在尝试使用迭代算法(期望最大化)来实现以马尔可夫模型表示的聚类。所以我需要进行迭代和连接操作。

我遇到的一个问题是,每次迭代的时间呈指数增长。

经过一些试验,我发现在进行迭代时,需要持久化要在下一次迭代中重用的RDD,否则每次迭代Spark都会创建一个执行计划,重新计算RDD,从而增加计算时间。

init = sc.parallelize(xrange(10000000), 3)
init.cache()
for i in range(6):
    print i
    start = datetime.datetime.now()
    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()
    print init.count()    
    print str(datetime.datetime.now() - start)

结果为:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

因此,添加cache()有所帮助,并且迭代时间变为恒定。

init = sc.parallelize(xrange(10000000), 3)
init.cache()
for i in range(6):
    print i
    start = datetime.datetime.now()
    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()
    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

但是,在迭代中进行连接操作时,问题又出现了。

下面是一些简单的代码来演示这个问题。即使在每个RDD转换上进行缓存也无法解决问题:

init = sc.parallelize(xrange(10000), 3)
init.cache()
for i in range(6):
    print i
    start = datetime.datetime.now()
    init2 = init.map(lambda n: (n, n*3))
    init2.cache()
    init3 = init.map(lambda n: (n, n*2))
    init3.cache()
    init4 = init2.join(init3)
    init4.count()
    init4.cache()
    init = init4.map(lambda n: n[0])
    init.cache()
    print init.count()    
    print str(datetime.datetime.now() - start)

以下是输出结果。如您所见,迭代时间呈指数增长:(

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

真的非常感谢任何帮助 🙂

0
0 Comments

问题:使用join时,Spark迭代时间呈指数增长。

原因:Rdds是不可变的。尝试使用rdd = rdd.cache()对其进行缓存。

解决方法:使用rdd = rdd.cache()对Rdds进行缓存。

0
0 Comments

Spark迭代时间使用join时呈指数增长的问题是由于没有指定分区数而导致的。分区数可以无限增长。解决该问题的方法有两种。

方法1:

如zero323所指出的,可以在调用join时手动指定分区数。例如:

rdd1.join(rdd2, numPartitions)

这将确保分区数不超过numPartitions,尤其是分区数不会不断增长。

方法2:

在创建SparkConf时,可以指定默认的并行程度。如果设置了该值,那么在调用join等函数时,如果没有指定numPartitions,将使用默认的并行程度,从而限制分区数并防止其增长。可以通过以下方式设置该参数:

conf=SparkConf.set("spark.default.parallelism", numPartitions)
sc = SparkContex(conf=conf)

0
0 Comments

问题:使用join操作时,Spark迭代时间呈指数增长。

原因:问题的出现是由于缺乏对RDD血统长度的控制,每次迭代中分区的数量都会随着self-join操作而增加,导致呈指数增长的模式。要解决这个问题,可以控制每次迭代中分区的数量,或者使用全局工具如spark.default.parallelism。第一种方法提供了更多的控制,并且不会影响代码的其他部分。

解决方法:可以通过控制每次迭代中分区的数量或者在每次迭代中对RDD进行重新分区来解决这个问题。具体的解决方法如下:

1. 创建一个用于收集统计信息的辅助函数get_stats。

2. 创建一个用于处理缓存和分区的辅助函数procRDD。

3. 提取流水线逻辑的部分代码。

4. 创建初始数据。

5. 使用join操作时,如果没有提供numPartitions参数,根据输入RDD的分区数量调整输出的分区数量。

6. 使用哈希分区器来确保join操作的性能。

7. 将缓存作为参考进行测试。

8. 通过使用较大的分区数量和哈希分区器来检查是否可以提高性能。

9. 对于DataFrames和iterativeUnions操作,可以采用类似的方法进行解决。

这个问题的解决方法可以帮助Spark用户在使用join操作时避免迭代时间呈指数增长的问题,提高代码的性能和效率。

0