如何改进我Spark应用程序中的reducebykey部分?
如何改进我Spark应用程序中的reducebykey部分?
我有64个Spark核心。我有超过8000万行的数据,占据了我的Cassandra集群的4.2 GB空间。现在我需要82秒来处理这些数据。我希望将这个时间缩短到8秒。你对此有什么想法吗?这个目标可行吗?谢谢。
这是我想改进的Spark应用的部分代码:
axes = sqlContext.read.format("org.apache.spark.sql.cassandra")\ .options(table="axes", keyspace=source, numPartitions="192").load()\ .repartition(64*3)\ .reduceByKey(lambda x,y:x+y,52)\ .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)]))\ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ .filter(lambda x:len(x[1])>=2) \ .map(lambda x:x[1][-1])
编辑:
这是我目前运行的代码,上面发布的代码只是一个实验,对于造成的混淆抱歉。上述问题与此代码有关。
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(64*3) \ .map(lambda x:(x.article,[Row(article=x.article,at=x.at,comments=x.comments,likes=x.likes,reads=x.reads,shares=x.shares)])).reduceByKey(lambda x,y:x+y)\ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ .filter(lambda x:len(x[1])>=2) \ .map(lambda x:x[1][-1])
谢谢。
问题:如何改进我的Spark应用程序中的reduceByKey部分?
原因:下面的代码存在一些问题,导致reduceByKey部分无法正确工作。
解决方法:
1. 第一步中的代码创建了一个Spark DataFrame,但是numPartitions选项可能不被识别,需要注意。
2. 第二步中的代码没有实际工作,只是将数据重新分区,这样做不会有任何作用。
3. 在第三步中,代码切换到RDD,但是由于Row是tuple的子类,而reduceByKey只能用于成对的RDD,所以每个元素必须是大小为2的tuple。此外,选择52个分区的原因不明。
4. 在第四步中,由于reduceByKey始终返回大小为2的tuple的RDD,所以以下部分代码将不起作用。特别是,x不能具有像article或comments这样的属性。此外,创建大小为1的列表没有必要。如果存在一些过时的列,应该在将数据转换为RDD之前过滤掉这些列,以避免过多的流量和减少内存使用。如果没有过时的列,则没有理由通过创建新对象增加对Python GC的压力。
5. 在第五步中,由于x[1]只有一个元素,对其进行排序没有意义。
6. 在第六步中,该过滤器将始终返回一个空的RDD。
7. 在第七步中,代码没有执行任何有用的操作。
如果使用了这段代码的某个版本,那么问题很可能是问题中显示的顺序混乱,从第四点开始的map代码:
.map(lambda x: (x.article,[Row(....)]))
在reduceByKey之前:
.reduceByKey(lambda x,y:x+y,52)
如果是这种情况,实际上使用.reduceByKey来执行groupByKey,这要么等效于具有所有问题的groupByKey(Python),要么效率较低(Scala)。此外,减少分区数量的操作也是非常可疑的。如果是这种情况,没有充分的理由将数据移出JVM(DataFrame -> RDD转换),并进行相应的序列化和反序列化,即使有,也可以通过使用实际的最大值进行实际的减少来轻松解决。
相关问题:
1. [在Spark DataFrame中找到每个组的最大行](https://stackoverflow.com/q/35218882/1560062)
2. [在reduceByKey中是否更喜欢使用groupByKey](https://stackoverflow.com/q/33221713/1560062)
希望听到您对此答案的评论。和zero323的想法相同,你们俩都是对的,上面的代码只是一个实验,感谢指出这些问题,帮了很大的忙。我在下面的编辑中发布了实际的代码。期待您的反馈。谢谢!
好吧,我认为我已经对你的代码做出了所有的评论,甚至包括那些不存在的部分 😉 a) 在你知道需要之前,不要进行重新分区 b) 在加载DF后,用上面显示的代码片段替换你的代码(你应该看到第一个性能提升) c) 用上面链接的直接方法之一替换RDD代码 d) 新的开始调优(分区数,代码生成)