Spark Streaming - NoSuchMethodError: scala.collection.immutable.Map$.apply
Spark Streaming - NoSuchMethodError: scala.collection.immutable.Map$.apply
我有一个简单的Spark Streaming的Java程序:\n
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; public class KafkaStreamingExample { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf().setAppName("KafkaStreamingExample").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); MapkafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "group1"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("dbserver1.inventory.customers"); JavaInputDStream > stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams) ); stream.foreachRDD(rdd -> { rdd.foreach(record -> { System.out.println(record.value()); }); }); jssc.start(); jssc.awaitTermination(); } }
\n该程序已针对Java 1.8 SDK进行编译,应该从Kafka主题中流式传输一些数据。\n我已经编译了一个Uber JAR,所以所有的依赖项都包含在内。\n我使用以下命令将其提交到我的Spark 3.3.1集群:\n
./bin/spark-submit --class KafkaStreamingExample --master yarn --deploy-mode client --executor-memory 2g --num-executors 1 kafka-streaming-example-jar-with-dependencies.jar
\n当作业提交时,我遇到了以下错误:\n
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.Map$.apply(Lscala/collection/immutable/Seq;)Ljava/lang/Object; at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.(DirectKafkaInputDStream.scala:68) at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:136) at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:115) at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:156) at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala) at KafkaStreamingExample.main(KafkaStreamingExample.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
\n我应该怎么改变才能克服这个错误?