如何将Spark Stream(Dstream / JavaDStream)写入Kafka?
如何将Spark Stream(Dstream / JavaDStream)写入Kafka?
我已经尝试了这段Java代码,并使用了以下最新的maven依赖包。
https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.4.0
以及之前的版本 - https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.1.0
但是下面的代码在Java 1.8和最新的Eclipse Oxygen IDE中无法编译。
---IMPORT SECTION--
import com.github.benfradet.spark.kafka.writer.DStreamKafkaWriter; import com.github.benfradet.spark.kafka.writer.KafkaWriter; import org.apache.kafka.common.serialization.StringSerializer; import scala.Function1; import scala.Option; import scala.Serializable;
MapproducerConfig = new HashMap (); producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("key.serializer", StringSerializer.class); producerConfig.put("value.serializer", StringSerializer.class); KafkaWriter kafkaWriter = new DStreamKafkaWriter<>(lines.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class)); Function1 > f = new MyFunc >() { @Override public ProducerRecord apply(final String s) { return new ProducerRecord<>("my-topic", s); } }; kafkaWriter.writeToKafka(producerConfig, f,Option.empty());
KafkaWriter.writerToKafka(producerConfig,f,Option.empty) --- 这行代码在Eclipse IDE中给出以下错误
任何帮助都将不胜感激。
问题的原因是Kafka writer的writeToKafka方法需要一个scala.collection.Map参数,而当前的尝试中使用了一个java.util.Map参数。官方的Java示例中使用了asScala转换,但是不知道它是从哪里来的(这似乎不合法,因为它看起来是使用了scala.collection.JavaConverters中的Scala隐式转换,但是在Java中无法工作)。最简单的解决方法是实例化一个scala.collection.immutable.HashMap,但是建议将作业迁移到使用Scala。如果使用Spark >= v2.2,最简单的写入Kafka的方法是将数据转换为Dataset或DataFrame,并使用DataFrameWriter进行写入。关于将java map转换为scala map,可以使用JavaConverters.mapAsScalaMap(javaMap)方法。