在任何地方导入Spark implicits的解决方法

14 浏览
0 Comments

在任何地方导入Spark implicits的解决方法

我对Spark 2.0和在我们的代码库中使用数据集还不熟悉。我注意到我需要在我们的代码中的每个地方都导入spark.implicits._。例如:

文件A
class A {
    def job(spark: SparkSession) = {
        import spark.implcits._
        //创建数据集ds
        val b = new B(spark)
        b.doSomething(ds)
        doSomething(ds)
    }
    private def doSomething(ds: Dataset[Foo], spark: SparkSession) = {
        import spark.implicits._
        ds.map(e => 1)            
    }
}
文件B
class B(spark: SparkSession) {
    def doSomething(ds: Dataset[Foo]) = {
        import spark.implicits._
        ds.map(e => "SomeString")
    }
}

我想问的是,是否有更简洁的方法来执行ds.map(e => "SomeString"),而不必在每个执行map的函数中都导入隐式转换?如果我不导入它,会出现以下错误:

错误:(53,13)找不到存储在数据集中的编码器类型。导入spark.implicits._支持基本类型(Int,String等)和产品类型(case类)。将来版本中将添加对序列化其他类型的支持。

0
0 Comments

Workaround for importing spark implicits everywhere

在使用Spark进行开发时,经常会遇到在不同的方法中需要使用SparkSession的情况。为了避免重复导入SparkSession的隐式转换,可以使用以下方法来解决这个问题。

首先,在每个调用的方法中重新使用现有的SparkSession,通过在方法内部创建一个本地的val变量,将当前活动的SparkSession赋值给它。

val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active

接下来,可以使用import语句导入spark的隐式转换。

import spark.implicits._

通过以上的解决方法,可以在不同的方法中共享同一个SparkSession,并且无需重复导入隐式转换。这种方法对我来说一直运行良好。

0
0 Comments

问题的出现原因是在使用Spark的时候,需要在每个函数中都进行import操作,这导致import语句过多,不够简洁。解决方法是将import操作放在class或object中进行,以减少import的次数。

具体解决方法如下:

首先,可以在class或object中进行import操作,而不是在每个函数中进行。例如,在"File A"和"File B"的例子中,将import操作放在class A和class B中:

File A:
class A {
    val spark = SparkSession.builder.getOrCreate()
    import spark.implicits._
    def job() = {
        //create dataset ds
        val b = new B(spark)
        b.doSomething(ds)
        doSomething(ds)
    }
    private def doSomething(ds: Dataset[Foo]) = {
        ds.map(e => 1)            
    }
}
File B:
class B(spark: SparkSession) {
    import spark.implicits._
    def doSomething(ds: Dataset[Foo]) = {    
        ds.map(e => "SomeString")
    }
}

通过这种方式,可以减少import的次数,使代码更加简洁。

然而,据我所知,没有其他方法可以进一步减少import的次数。这是因为在进行实际的import操作时,需要使用到SparkSession对象。因此,这是目前能做到的最好的解决方法。

更新:

更方便的方法是创建一个Scala Trait,并将其与一个空的Object结合使用。这样可以在每个文件的顶部轻松导入implicits,并允许扩展trait以使用SparkSession对象。

例如:

trait SparkJob {
  val spark: SparkSession = SparkSession.builder.
    .master(...)
    .config(..., ....) // Any settings to be applied
    .getOrCreate()
}
object SparkJob extends SparkJob {}

使用这种方法,我们可以在File A和B中进行如下操作:

File A:

import SparkJob.spark.implicits._
class A extends SparkJob {
  spark.sql(...) // Allows for usage of the SparkSession inside the class
  ...
}

File B:

import SparkJob.spark.implicits._
class B extends SparkJob {
  ...    
}

需要注意的是,只有使用spark对象本身的类或对象才需要扩展SparkJob。

这样,可以在每个文件的顶部轻松导入implicits,使代码更加简洁。

0