在使用mapPartitionsToPair / PairFlatMapFunction时如何返回一个迭代器。
在使用mapPartitionsToPair / PairFlatMapFunction时如何返回一个迭代器。
在使用Spark的mapPartitionsToPair / PairFlatMapFunction时,我在互联网上找到了一个例子:\n
spark.read().textFile(hdfsPath).javaRDD() .mapPartitionsToPair(new PairFlatMapFunction, String, String>() { public Iterable > call(Iterator input) { List result = new ArrayList (); while (input.hasNext()) result.add(doSomeThing(input.next())); return result; } });
\n但是在编译时报错:\nreturn type Iterable
\n我发现call
的声明是:\njava.util.Iterator
\n所以call
应该返回一个Iterator。\n因此,有人能帮我解决如何在Spark的javaRDD API中返回一个Iterator的问题吗?非常感谢\nPS:\n我尝试了以下的代码,在集群上无效:\n
public Iterator> call(Iterator input) { List result = new ArrayList (); while (input.hasNext()) result.add(doSomeThing(input.next())); return result.iterator; }
在使用mapPartitionsToPair / PairFlatMapFunction时,出现问题可能是因为开发环境和集群之间的Spark版本不匹配。从Spark-2.0.0开始,Java RDD的flatMap和mapPartitions函数返回的是Java迭代器而不是可迭代对象。因此,如果集群的Spark版本低于2.0.0,则开发时应使用相同的Spark版本。
对于Spark-2.0.0或更高版本,可以使用以下代码解决问题:
public Iterator> call(Iterator input) { List result = new ArrayList (); while (input.hasNext()) { result.add(doSomeThing(input.next())); } return result.iterator(); }
对于Spark版本低于2.0.0,可以使用以下代码解决问题:
public Iterable> call(Iterator input) { List result = new ArrayList (); while (input.hasNext()) { result.add(doSomeThing(input.next())); } return result; }
mapPartitionsToPair的用途是将分区的每个元素转换为键值对(Tuple2
参考链接:[apache spark map vs mappartitions](https://stackoverflow.com/questions/21185092)