如何将Spark Stream(Dstream / JavaDStream)写入Kafka?

5 浏览
0 Comments

如何将Spark Stream(Dstream / JavaDStream)写入Kafka?

我已经尝试了这段Java代码,并使用了以下最新的maven依赖包。

https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.4.0

以及之前的版本 - https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.1.0

但是下面的代码在Java 1.8和最新的Eclipse Oxygen IDE中无法编译。

---IMPORT SECTION--

import com.github.benfradet.spark.kafka.writer.DStreamKafkaWriter;
import com.github.benfradet.spark.kafka.writer.KafkaWriter;
import org.apache.kafka.common.serialization.StringSerializer;
import scala.Function1;
import scala.Option;
import scala.Serializable;


Map producerConfig = new HashMap();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("key.serializer", StringSerializer.class);
producerConfig.put("value.serializer", StringSerializer.class);
        KafkaWriter kafkaWriter = new DStreamKafkaWriter<>(lines.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class));
        Function1> f = new MyFunc>() {
            @Override
            public ProducerRecord apply(final String s) {
                return new ProducerRecord<>("my-topic", s);
            }
        };
        kafkaWriter.writeToKafka(producerConfig, f,Option.empty());

KafkaWriter.writerToKafka(producerConfig,f,Option.empty) --- 这行代码在Eclipse IDE中给出以下错误

spark-kafka-writer-error

任何帮助都将不胜感激。

0
0 Comments

问题的原因是Kafka writer的writeToKafka方法需要一个scala.collection.Map参数,而当前的尝试中使用了一个java.util.Map参数。官方的Java示例中使用了asScala转换,但是不知道它是从哪里来的(这似乎不合法,因为它看起来是使用了scala.collection.JavaConverters中的Scala隐式转换,但是在Java中无法工作)。最简单的解决方法是实例化一个scala.collection.immutable.HashMap,但是建议将作业迁移到使用Scala。如果使用Spark >= v2.2,最简单的写入Kafka的方法是将数据转换为Dataset或DataFrame,并使用DataFrameWriter进行写入。关于将java map转换为scala map,可以使用JavaConverters.mapAsScalaMap(javaMap)方法。

0