如何在Python中使用线程来并行执行AWS S3 API调用?

11 浏览
0 Comments

如何在Python中使用线程来并行执行AWS S3 API调用?

我编写了一个Python脚本,通过使用AWS Boto 3 list_objects()方法来确定所有可用的AWS S3存储桶的总大小。\n逻辑很简单:\n

    \n

  1. 从每个S3存储桶中获取初始对象列表(自动在1,000个对象之后截断)
  2. \n

  3. 遍历对象列表中的每个对象,将该对象的大小添加到一个total_size变量中
  4. \n

  5. 当存储桶仍有其他对象时,检索它们并重复步骤2
  6. \n

\n以下是相关的代码片段:\n

import boto3
s3_client = boto3.client('s3')
# 获取由请求的验证发送者拥有的所有S3存储桶
buckets = s3_client.list_buckets()
# 对于每个存储桶...
for bucket in buckets['Buckets']:
    # 获取存储桶中的前1,000个对象
    bucket_objects = s3_client.list_objects(Bucket=bucket['Name'])
    # 初始化total_size
    total_size = 0
    # 将存储桶中每个单独项目的大小添加到总大小中
    for obj in bucket_objects['Contents']:
        total_size += obj['Size']
    # 如果存储桶有更多对象,则获取它们
    while bucket_objects['IsTruncated']:
        # 获取下一个1,000个对象,从当前列表的最后一个对象之后开始
        bucket_objects = s3_client.list_objects(
            Bucket=bucket['Name'],
            Marker=bucket_objects['Contents'][-1]['Key'])
        for obj in bucket_objects['Contents']:
            total_size += obj['Size']
    size_in_MB = total_size/1000000.0
    print('存储桶%s中对象的总大小:%.2f MB'
        % (bucket['Name'], size_in_MB))

\n这段代码在数据大小小于约5 MB的存储桶上运行得相对较快,但是当我遇到一个包含90+ MB数据的存储桶时,执行时间从毫秒级跳升到20-30+秒。\n我希望使用threading模块来并行化代码的I/O部分(从S3获取对象列表),以便在获取它们的线程完成后立即添加所有对象的总大小,而不必按顺序进行检索和添加。\n我知道由于GIL的存在,Python不支持真正的多线程,为了避免得到这样的回答,但我了解到由于这是一个I/O操作而不是CPU密集型操作,threading模块应该能够提高运行时间。\n我问题与我在这里看到的几个线程实现的主要区别在于,我不是在已知的列表或集合上进行迭代。在这里,我必须首先检索对象列表,查看列表是否被截断,然后根据当前列表中最后一个对象的键检索下一个对象列表。\n有人可以解释一下如何改进这段代码的运行时间,或者在这种情况下不可能吗?

0
0 Comments

问题的出现原因:在处理AWS S3 API调用时,如果需要并行处理大量的API调用,可以使用Python中的线程来实现并行化。

解决方法:可以通过以下步骤来实现并行化的AWS S3 API调用:

1. 首先,使用给定的机制(在这篇帖子中有描述)列出所有子文件夹,并获取子文件夹的列表。

2. 然后,使用获取到的子文件夹列表,将其提交给一个多进程池(或线程池),其中每个工作进程将获取与一个子文件夹对应的所有键,并将它们收集在一个共享容器中,使用多进程管理器进行管理。通过这种方式,可以并行地获取键。

3. 如果键分布均匀且层次结构良好,则上述解决方案将表现最佳;如果数据是扁平组织的,则表现最差。

以下是实现并行化的AWS S3 API调用的Python代码示例:

import multiprocessing
import boto3
# 获取子文件夹列表
def get_subfolders():
    # 使用给定机制获取子文件夹列表
    # ...
# 获取特定前缀的键
def get_keys(prefix):
    # 使用给定前缀获取键
    # ...
if __name__ == '__main__':
    # 获取子文件夹列表
    subfolders = get_subfolders()
    # 创建多进程池
    pool = multiprocessing.Pool()
    # 创建多进程管理器
    manager = multiprocessing.Manager()
    shared_container = manager.list()
    # 提交任务给多进程池
    for subfolder in subfolders:
        pool.apply_async(get_keys, args=(subfolder,), callback=shared_container.append)
    # 关闭多进程池
    pool.close()
    pool.join()
    # 打印获取到的键
    print(shared_container)

通过上述方法,可以利用Python中的线程来并行化AWS S3 API调用,从而提高处理效率。

0
0 Comments

在Python中使用线程来并行化AWS S3 API调用的原因是为了提高性能和效率。当上传大量文件到S3存储桶时,使用单线程可能会导致执行时间过长,因此使用多线程可以加速上传过程。

在问题的内容中,提到了一个解决方法,即为每个线程创建一个单独的会话(session)。在原始代码中,只创建了一个会话,导致各个线程之间相互干扰,随机错误发生。通过为每个线程创建单独的会话,可以避免这个问题。

具体的解决方法如下:

将原始代码中的

s3_client = boto3.client('s3')

替换为

s3_client = boto3.session.Session().client('s3')

这样就为每个线程创建了独立的会话,确保线程之间互不干扰。

另外,文章中还提到了多线程的一般性问题。在使用多线程时,需要注意并发访问共享资源可能导致的竞争条件和线程安全问题。

根据作者的经验,他的项目需要上传135,000个文件到S3存储桶。通过使用8个线程,他得到了最佳的性能表现。原本需要3.6小时的任务,现在只需要1.25小时就能完成。

最后,作者对于这个解决方法的有效性表示肯定,并表示这个答案对他的帮助很大。

通过使用多线程并创建独立的会话,可以在Python中并行化AWS S3 API调用,提高上传大量文件的性能和效率。

0