在任何地方导入Spark implicits的解决方法
在任何地方导入Spark implicits的解决方法
我对Spark 2.0和在我们的代码库中使用数据集还不熟悉。我注意到我需要在我们的代码中的每个地方都导入spark.implicits._
。例如:
文件A class A { def job(spark: SparkSession) = { import spark.implcits._ //创建数据集ds val b = new B(spark) b.doSomething(ds) doSomething(ds) } private def doSomething(ds: Dataset[Foo], spark: SparkSession) = { import spark.implicits._ ds.map(e => 1) } } 文件B class B(spark: SparkSession) { def doSomething(ds: Dataset[Foo]) = { import spark.implicits._ ds.map(e => "SomeString") } }
我想问的是,是否有更简洁的方法来执行ds.map(e => "SomeString")
,而不必在每个执行map的函数中都导入隐式转换?如果我不导入它,会出现以下错误:
错误:(53,13)找不到存储在数据集中的编码器类型。导入spark.implicits._支持基本类型(Int,String等)和产品类型(case类)。将来版本中将添加对序列化其他类型的支持。
Workaround for importing spark implicits everywhere
在使用Spark进行开发时,经常会遇到在不同的方法中需要使用SparkSession的情况。为了避免重复导入SparkSession的隐式转换,可以使用以下方法来解决这个问题。
首先,在每个调用的方法中重新使用现有的SparkSession,通过在方法内部创建一个本地的val变量,将当前活动的SparkSession赋值给它。
val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active
接下来,可以使用import语句导入spark的隐式转换。
import spark.implicits._
通过以上的解决方法,可以在不同的方法中共享同一个SparkSession,并且无需重复导入隐式转换。这种方法对我来说一直运行良好。
问题的出现原因是在使用Spark的时候,需要在每个函数中都进行import操作,这导致import语句过多,不够简洁。解决方法是将import操作放在class或object中进行,以减少import的次数。
具体解决方法如下:
首先,可以在class或object中进行import操作,而不是在每个函数中进行。例如,在"File A"和"File B"的例子中,将import操作放在class A和class B中:
File A: class A { val spark = SparkSession.builder.getOrCreate() import spark.implicits._ def job() = { //create dataset ds val b = new B(spark) b.doSomething(ds) doSomething(ds) } private def doSomething(ds: Dataset[Foo]) = { ds.map(e => 1) } } File B: class B(spark: SparkSession) { import spark.implicits._ def doSomething(ds: Dataset[Foo]) = { ds.map(e => "SomeString") } }
通过这种方式,可以减少import的次数,使代码更加简洁。
然而,据我所知,没有其他方法可以进一步减少import的次数。这是因为在进行实际的import操作时,需要使用到SparkSession对象。因此,这是目前能做到的最好的解决方法。
更新:
更方便的方法是创建一个Scala Trait,并将其与一个空的Object结合使用。这样可以在每个文件的顶部轻松导入implicits,并允许扩展trait以使用SparkSession对象。
例如:
trait SparkJob { val spark: SparkSession = SparkSession.builder. .master(...) .config(..., ....) // Any settings to be applied .getOrCreate() } object SparkJob extends SparkJob {}
使用这种方法,我们可以在File A和B中进行如下操作:
File A:
import SparkJob.spark.implicits._ class A extends SparkJob { spark.sql(...) // Allows for usage of the SparkSession inside the class ... }
File B:
import SparkJob.spark.implicits._ class B extends SparkJob { ... }
需要注意的是,只有使用spark对象本身的类或对象才需要扩展SparkJob。
这样,可以在每个文件的顶部轻松导入implicits,使代码更加简洁。