在使用mapPartitionsToPair / PairFlatMapFunction时如何返回一个迭代器。

8 浏览
0 Comments

在使用mapPartitionsToPair / PairFlatMapFunction时如何返回一个迭代器。

在使用Spark的mapPartitionsToPair / PairFlatMapFunction时,我在互联网上找到了一个例子:\n

spark.read().textFile(hdfsPath).javaRDD()
.mapPartitionsToPair(new PairFlatMapFunction, String, String>() {
  public Iterable> call(Iterator input) {
    List result = new ArrayList();
    while (input.hasNext()) result.add(doSomeThing(input.next()));
    return result;
  }
});

\n但是在编译时报错:\nreturn type Iterable> is not compatible with Iterator>\n我发现call的声明是:\njava.util.Iterator> call(T t)\n所以call应该返回一个Iterator。\n因此,有人能帮我解决如何在Spark的javaRDD API中返回一个Iterator的问题吗?非常感谢\nPS:\n我尝试了以下的代码,在集群上无效:\n

public Iterator> call(Iterator input) {
  List result = new ArrayList();
  while (input.hasNext()) result.add(doSomeThing(input.next()));
  return result.iterator;
}

0
0 Comments

在使用mapPartitionsToPair / PairFlatMapFunction时,出现问题可能是因为开发环境和集群之间的Spark版本不匹配。从Spark-2.0.0开始,Java RDD的flatMap和mapPartitions函数返回的是Java迭代器而不是可迭代对象。因此,如果集群的Spark版本低于2.0.0,则开发时应使用相同的Spark版本。

对于Spark-2.0.0或更高版本,可以使用以下代码解决问题:

public Iterator> call(Iterator input) {
    List result = new ArrayList();
    while (input.hasNext()) {
        result.add(doSomeThing(input.next()));
    }
    return result.iterator();
}

对于Spark版本低于2.0.0,可以使用以下代码解决问题:

public Iterable> call(Iterator input) {
    List result = new ArrayList();
    while (input.hasNext()) {
        result.add(doSomeThing(input.next()));
    }
    return result;
}

mapPartitionsToPair的用途是将分区的每个元素转换为键值对(Tuple2)。根据需求,我们可以使用该函数进行转换。

参考链接:[apache spark map vs mappartitions](https://stackoverflow.com/questions/21185092)

0