如何确保我的Apache Spark设置代码只运行一次?
如何确保我的Apache Spark设置代码只运行一次?
我正在使用Scala编写Spark作业,读取S3上的Parquet文件,进行一些简单的转换,然后将它们保存到DynamoDB实例中。每次运行时,我们需要在Dynamo中创建一个新表,因此我编写了一个负责表创建的Lambda函数。我的Spark作业的第一件事是生成一个表名,调用我的Lambda函数(将新表名传递给它),等待表被创建,然后正常进行ETL步骤。
然而,看起来我的Lambda函数被调用了两次。我不能解释这一点。以下是一些代码的示例:
def main(spark: SparkSession, pathToParquet: String) { // generate a unique table name val tableName = generateTableName() // call the lambda function val result = callLambdaFunction(tableName) // wait for the table to be created waitForTableCreation(tableName) // normal ETL pipeline var parquetRDD = spark.read.parquet(pathToParquet) val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)]) transformedRDD.saveAsHadoopDataset(getConfiguration(tableName)) spark.sparkContext.stop() }
等待表创建的代码非常简单,如下所示:
def waitForTableCreation(tableName: String) { val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient() val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists() try { waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName))) } catch { case ex: WaiterTimedOutException => LOGGER.error("Timed out waiting to create table: " + tableName) throw ex case t: Throwable => throw t } }
而Lambda调用同样简单:
def callLambdaFunction(tableName: String) { val myLambda = LambdaInvokerFactory.builder() .lambdaClient(AWSLambdaClientBuilder.defaultClient) .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME)) .build(classOf[MyLambdaContract]) myLambda.invoke(new MyLambdaInput(tableName)) }
就像我所说的,当我在此代码上运行spark-submit
时,它确实会触发Lambda函数。但我无法解释为什么它会调用两次。结果是我在DynamoDB中得到了两个表格。
等待步骤似乎也会在运行作为Spark作业的上下文中失败。但是当我对等待代码进行单元测试时,它似乎能够很好地工作。它成功地阻塞,直到表准备就绪。
起初,我猜测可能spark-submit
正在将此代码发送到所有工作节点,并且它们独立运行整个过程。一开始,我有一个带有1个主节点和2个工作节点的Spark集群。然而,我在另一个带有1个主节点和5个工作节点的集群上测试了这一点,那里它确实精确地触发了Lambda函数两次,然后显然在调用Lambda后很快就死亡了,因为它未能等待表被创建。
有人有任何线索,说明Spark可能在做什么吗?我是不是漏了什么明显的东西?
更新:这是我的spark-submit参数,在EMR的步骤选项卡中可见。
spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar
这是我的getConfiguration
函数的代码:
def getConfiguration(tableName: String) : JobConf = { val conf = new Configuration() conf.set("dynamodb.servicename", "dynamodb") conf.set("dynamodb.input.tableName", tableName) conf.set("dynamodb.output.tableName", tableName) conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com") conf.set("dynamodb.regionid", "us-east-1") conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") new JobConf(conf) }
此外,这里是一个包含我尝试运行它时看到的某些异常日志的Gist。
我在集群模式下也遇到了相同的问题(v2.4.0)。我通过使用SparkLauncher以编程方式启动我的应用程序来解决它,而不是使用spark-submit.sh。您可以将您的Lambda逻辑移入启动您的Spark应用程序的主方法中,如下所示:
def main(args: Array[String]) = { // generate a unique table name val tableName = generateTableName() // call the lambda function val result = callLambdaFunction(tableName) // wait for the table to be created waitForTableCreation(tableName) val latch = new CountDownLatch(1); val handle = new SparkLauncher(env) .setAppResource("/path/to/spark-app.jar") .setMainClass("com.company.SparkApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.executor.instances", "2") .setConf("spark.executor.cores", "2") // other conf ... .setVerbose(true) .startApplication(new SparkAppHandle.Listener { override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = { latch.countDown() } override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = { } }) println("app is launching...") latch.await() println("app exited") }
感谢 @soapergem 添加日志记录和选项。我添加了一个回答(一个尝试),因为它可能比评论稍微长一点 🙂
总结一下:
- 使用
spark-submit
和配置选项没有什么奇怪的事情 - 在 https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log 中,您可以看到应用程序被执行了两次。它从 ACCEPTED 到 RUNNING 状态通过了两次。这与 EMR 默认设置一致(如何防止 EMR Spark 步骤重试?)。为了确认这一点,您可以检查在执行步骤后是否创建了 2 个表(我假设您正在生成具有动态名称的表;每次执行一个不同的名称,在重试的情况下应该给出 2 个不同的名称)
关于您的最后一个问题:
看起来我的代码在 "client" 部署模式下运行,而不是 "cluster" 部署模式下运行,它可能有效?这对在这里的任何人提供了任何提示吗?
有关差异的更多信息,请检查 https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html 在您的情况下,看起来在客户端模式下执行 spark-submit
的机器具有不同的 IAM 策略,而不是 EMR 作业流。我在这里的假设是,您的作业流角色不允许使用 dynamodb:Describe*
,这就是为什么您会收到 500 code
异常(从您的gist中):
Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857) at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129) at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126) at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)
为了确认这个假设,您可以在本地执行创建表和等待创建的操作(这里没有 Spark 代码,只有您的主函数的简单 java
命令),并执行以下操作:
- 对于第一次执行,请确保您拥有所有权限。我认为它将是
dynamodb:Describe*
,在Resources: *
上(如果这是原因,则我认为在生产中应使用类似Resources: Test_Emr*
的东西) - 对于第二次执行,请删除
dynamodb:Describe*
,并检查您是否收到了与 gist 中相同的堆栈跟踪