连接到PySpark中的S3数据

9 浏览
0 Comments

连接到PySpark中的S3数据

我正在尝试读取一个来自Amazon S3的JSON文件,以创建一个Spark上下文并用它来处理数据。\nSpark基本上是在一个Docker容器中。所以将文件放在Docker路径中也是很麻烦的。因此,我将它推送到了S3。\n下面的代码解释了其他的内容。\n

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)
config_dict = {"fs.s3n.awsAccessKeyId":"**",
               "fs.s3n.awsSecretAccessKey":"**"}
bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
                    'org.apache.hadoop.mapred.TextInputFormat',
                    'org.apache.hadoop.io.Text',
                    'org.apache.hadoop.io.LongWritable',
                    conf=config_dict)

\n我得到了以下错误 -\n

Py4JJavaError                             Traceback (most recent call last)
 in ()
      9                     'org.apache.hadoop.io.Text',
     10                     'org.apache.hadoop.io.LongWritable',
---> 11                     conf=config_dict)
     12 
/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
    558         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
    559                                               valueClass, keyConverter, valueConverter,
--> 560                                               jconf, batchSize)
    561         return RDD(jrdd, self)
    562 
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
    at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
    at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:744)

\n我明确提供了aswSecretAccessKey和awsAccessId。出了什么问题?

0
0 Comments

问题:如何从PySpark连接到S3数据?

原因:在这个问题中,作者提到他使用了实例配置文件凭证(Instance profile credentials)来访问S3数据。实例配置文件凭证是在EC2实例上使用的,通过Amazon EC2元数据服务提供。AWS Java SDK使用InstanceProfileCredentialsProvider来加载这些凭证。

解决方法:对于PySpark,作者使用了一些设置来访问S3内容。他首先定义了一个函数get_spark_context,其中包含了一些配置。然后,通过设置Spark Context的Hadoop配置来指定S3的访问细节。最后,返回一个配置好的SQL Context。

下面是作者给出的示例代码:

def get_spark_context(app_name):
    # configure
    conf = pyspark.SparkConf()
    # init & return
    sc = pyspark.SparkContext.getOrCreate(conf=conf)
    # s3a config
    sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
                                      's3.eu-central-1.amazonaws.com')
    sc._jsc.hadoopConfiguration().set(
        'fs.s3a.aws.credentials.provider',
        'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
        'com.amazonaws.auth.profile.ProfileCredentialsProvider'
    )
    return pyspark.SQLContext(sparkContext=sc)

还有更多关于Spark Context的信息,以及有关类型S3访问的问题的链接。

作者还补充了一些关于S3A连接器的信息。S3A连接器的默认行为是将EC2 IAM凭证提供程序添加到其供应商列表中,但它是最慢的,可能会触发限流。标准顺序是:URL中的密钥(已从最新版本中删除),XML或JCEKS文件中的fs.s3a.secret设置,环境变量,IAM角色。Spark-submit还会查找AWS_环境变量,并从中设置s3n和s3a键值。

还有关于限流的一些问题,主要是关于InstanceProfileCredentialsProvider在早期版本中的一个bug,每个实例都是唯一的,现在已经改为在所有线程中使用单例。否则,在单个JVM中的每个文件系统客户端都会命中AWS身份验证服务,而这些服务像其他服务一样有限流。

为了从PySpark连接到S3数据,我们可以使用实例配置文件凭证来访问S3数据,并通过设置Spark Context的Hadoop配置来指定S3的访问细节。此外,我们还可以了解S3A连接器的默认行为和限流问题。

文章内容如上所述。

0
0 Comments

问题的原因是需要连接到S3数据,并在PySpark中执行Spark作业。解决方法是添加hadoop-aws包,并设置AWS凭据。

在spark-submit命令中添加--packages org.apache.hadoop:hadoop-aws:2.7.1将下载所有缺少的hadoop包,以允许执行与S3相关的Spark作业。然后,在作业中设置AWS凭据:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)

另一种设置凭据的选项是将其定义在spark/conf/spark-env文件中:

#!/usr/bin/env bash
AWS_ACCESS_KEY_ID='xxxx'
AWS_SECRET_ACCESS_KEY='xxxx'
SPARK_WORKER_CORES=1 # 设置此机器上要使用的核心数
SPARK_WORKER_MEMORY=1g # 设置工作器拥有的总内存以提供给执行器(例如1000m,2g)
SPARK_EXECUTOR_INSTANCES=10 # 设置每个节点的工作进程数

更多信息:

关于如何从S3文件夹中读取parquet文件的问题,上述代码对我不起作用。

你能展示一下你尝试从parquet文件中读取的方式吗?

sc = SparkContext.getOrCreate()  
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'A') 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 's') 
sqlContext = SQLContext(sc) 
df2 = sqlContext.read.parquet(s3://path/to/folder)

我看到你正在从一个s3路径而不是s3n路径中读取。在这种情况下,你可以尝试使用fs.s3.awsAccessKeyId和fs.s3.awsSecretAccessKey来设置你的凭据。

0