如何确保我的Apache Spark设置代码只运行一次?

10 浏览
0 Comments

如何确保我的Apache Spark设置代码只运行一次?

我正在使用Scala编写Spark作业,读取S3上的Parquet文件,进行一些简单的转换,然后将它们保存到DynamoDB实例中。每次运行时,我们需要在Dynamo中创建一个新表,因此我编写了一个负责表创建的Lambda函数。我的Spark作业的第一件事是生成一个表名,调用我的Lambda函数(将新表名传递给它),等待表被创建,然后正常进行ETL步骤。

然而,看起来我的Lambda函数被调用了两次。我不能解释这一点。以下是一些代码的示例:

def main(spark: SparkSession, pathToParquet: String) {
  // generate a unique table name
  val tableName = generateTableName()
  // call the lambda function
  val result = callLambdaFunction(tableName)
  // wait for the table to be created
  waitForTableCreation(tableName)
  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
  transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
  spark.sparkContext.stop()
}

等待表创建的代码非常简单,如下所示:

def waitForTableCreation(tableName: String) {
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try {
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
  } catch {
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t
  }
}

而Lambda调用同样简单:

def callLambdaFunction(tableName: String) {
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaClient(AWSLambdaClientBuilder.defaultClient)
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
    .build(classOf[MyLambdaContract])
  myLambda.invoke(new MyLambdaInput(tableName))
}

就像我所说的,当我在此代码上运行spark-submit时,它确实会触发Lambda函数。但我无法解释为什么它会调用两次。结果是我在DynamoDB中得到了两个表格。

等待步骤似乎也会在运行作为Spark作业的上下文中失败。但是当我对等待代码进行单元测试时,它似乎能够很好地工作。它成功地阻塞,直到表准备就绪。

起初,我猜测可能spark-submit正在将此代码发送到所有工作节点,并且它们独立运行整个过程。一开始,我有一个带有1个主节点和2个工作节点的Spark集群。然而,我在另一个带有1个主节点和5个工作节点的集群上测试了这一点,那里它确实精确地触发了Lambda函数两次,然后显然在调用Lambda后很快就死亡了,因为它未能等待表被创建。

有人有任何线索,说明Spark可能在做什么吗?我是不是漏了什么明显的东西?

更新:这是我的spark-submit参数,在EMR的步骤选项卡中可见。

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar

这是我的getConfiguration函数的代码:

def getConfiguration(tableName: String) : JobConf = {
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)
}

此外,这里是一个包含我尝试运行它时看到的某些异常日志的Gist

admin 更改状态以发布 2023年5月20日
0
0 Comments

我在集群模式下也遇到了相同的问题(v2.4.0)。我通过使用SparkLauncher以编程方式启动我的应用程序来解决它,而不是使用spark-submit.sh。您可以将您的Lambda逻辑移入启动您的Spark应用程序的主方法中,如下所示:

def main(args: Array[String]) = {
    // generate a unique table name
    val tableName = generateTableName()
    // call the lambda function
    val result = callLambdaFunction(tableName)
    // wait for the table to be created
    waitForTableCreation(tableName)
    val latch = new CountDownLatch(1);
    val handle = new SparkLauncher(env)
        .setAppResource("/path/to/spark-app.jar")
        .setMainClass("com.company.SparkApp")
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf("spark.executor.instances", "2")
        .setConf("spark.executor.cores", "2")
        // other conf ... 
        .setVerbose(true)
        .startApplication(new SparkAppHandle.Listener {
            override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {
                latch.countDown()
            }
            override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {
            }
        })  
    println("app is launching...")
    latch.await()
    println("app exited")
}

0
0 Comments

感谢 @soapergem 添加日志记录和选项。我添加了一个回答(一个尝试),因为它可能比评论稍微长一点 🙂

总结一下:

关于您的最后一个问题:

看起来我的代码在 "client" 部署模式下运行,而不是 "cluster" 部署模式下运行,它可能有效?这对在这里的任何人提供了任何提示吗?

有关差异的更多信息,请检查 https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html 在您的情况下,看起来在客户端模式下执行 spark-submit 的机器具有不同的 IAM 策略,而不是 EMR 作业流。我在这里的假设是,您的作业流角色不允许使用 dynamodb:Describe*,这就是为什么您会收到 500 code 异常(从您的gist中):

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

为了确认这个假设,您可以在本地执行创建表和等待创建的操作(这里没有 Spark 代码,只有您的主函数的简单 java 命令),并执行以下操作:

  • 对于第一次执行,请确保您拥有所有权限。我认为它将是 dynamodb:Describe*,在 Resources: * 上(如果这是原因,则我认为在生产中应使用类似 Resources: Test_Emr* 的东西)
  • 对于第二次执行,请删除 dynamodb:Describe*,并检查您是否收到了与 gist 中相同的堆栈跟踪
0