如何在Python中使用线程来并行执行AWS S3 API调用?
如何在Python中使用线程来并行执行AWS S3 API调用?
我编写了一个Python脚本,通过使用AWS Boto 3 list_objects()方法来确定所有可用的AWS S3存储桶的总大小。\n逻辑很简单:\n
- \n
- 从每个S3存储桶中获取初始对象列表(自动在1,000个对象之后截断)
- 遍历对象列表中的每个对象,将该对象的大小添加到一个total_size变量中
- 当存储桶仍有其他对象时,检索它们并重复步骤2
\n
\n
\n
\n以下是相关的代码片段:\n
import boto3 s3_client = boto3.client('s3') # 获取由请求的验证发送者拥有的所有S3存储桶 buckets = s3_client.list_buckets() # 对于每个存储桶... for bucket in buckets['Buckets']: # 获取存储桶中的前1,000个对象 bucket_objects = s3_client.list_objects(Bucket=bucket['Name']) # 初始化total_size total_size = 0 # 将存储桶中每个单独项目的大小添加到总大小中 for obj in bucket_objects['Contents']: total_size += obj['Size'] # 如果存储桶有更多对象,则获取它们 while bucket_objects['IsTruncated']: # 获取下一个1,000个对象,从当前列表的最后一个对象之后开始 bucket_objects = s3_client.list_objects( Bucket=bucket['Name'], Marker=bucket_objects['Contents'][-1]['Key']) for obj in bucket_objects['Contents']: total_size += obj['Size'] size_in_MB = total_size/1000000.0 print('存储桶%s中对象的总大小:%.2f MB' % (bucket['Name'], size_in_MB))
\n这段代码在数据大小小于约5 MB的存储桶上运行得相对较快,但是当我遇到一个包含90+ MB数据的存储桶时,执行时间从毫秒级跳升到20-30+秒。\n我希望使用threading模块来并行化代码的I/O部分(从S3获取对象列表),以便在获取它们的线程完成后立即添加所有对象的总大小,而不必按顺序进行检索和添加。\n我知道由于GIL的存在,Python不支持真正的多线程,为了避免得到这样的回答,但我了解到由于这是一个I/O操作而不是CPU密集型操作,threading模块应该能够提高运行时间。\n我问题与我在这里看到的几个线程实现的主要区别在于,我不是在已知的列表或集合上进行迭代。在这里,我必须首先检索对象列表,查看列表是否被截断,然后根据当前列表中最后一个对象的键检索下一个对象列表。\n有人可以解释一下如何改进这段代码的运行时间,或者在这种情况下不可能吗?
问题的出现原因:在处理AWS S3 API调用时,如果需要并行处理大量的API调用,可以使用Python中的线程来实现并行化。
解决方法:可以通过以下步骤来实现并行化的AWS S3 API调用:
1. 首先,使用给定的机制(在这篇帖子中有描述)列出所有子文件夹,并获取子文件夹的列表。
2. 然后,使用获取到的子文件夹列表,将其提交给一个多进程池(或线程池),其中每个工作进程将获取与一个子文件夹对应的所有键,并将它们收集在一个共享容器中,使用多进程管理器进行管理。通过这种方式,可以并行地获取键。
3. 如果键分布均匀且层次结构良好,则上述解决方案将表现最佳;如果数据是扁平组织的,则表现最差。
以下是实现并行化的AWS S3 API调用的Python代码示例:
import multiprocessing import boto3 # 获取子文件夹列表 def get_subfolders(): # 使用给定机制获取子文件夹列表 # ... # 获取特定前缀的键 def get_keys(prefix): # 使用给定前缀获取键 # ... if __name__ == '__main__': # 获取子文件夹列表 subfolders = get_subfolders() # 创建多进程池 pool = multiprocessing.Pool() # 创建多进程管理器 manager = multiprocessing.Manager() shared_container = manager.list() # 提交任务给多进程池 for subfolder in subfolders: pool.apply_async(get_keys, args=(subfolder,), callback=shared_container.append) # 关闭多进程池 pool.close() pool.join() # 打印获取到的键 print(shared_container)
通过上述方法,可以利用Python中的线程来并行化AWS S3 API调用,从而提高处理效率。
在Python中使用线程来并行化AWS S3 API调用的原因是为了提高性能和效率。当上传大量文件到S3存储桶时,使用单线程可能会导致执行时间过长,因此使用多线程可以加速上传过程。
在问题的内容中,提到了一个解决方法,即为每个线程创建一个单独的会话(session)。在原始代码中,只创建了一个会话,导致各个线程之间相互干扰,随机错误发生。通过为每个线程创建单独的会话,可以避免这个问题。
具体的解决方法如下:
将原始代码中的
s3_client = boto3.client('s3')
替换为
s3_client = boto3.session.Session().client('s3')
这样就为每个线程创建了独立的会话,确保线程之间互不干扰。
另外,文章中还提到了多线程的一般性问题。在使用多线程时,需要注意并发访问共享资源可能导致的竞争条件和线程安全问题。
根据作者的经验,他的项目需要上传135,000个文件到S3存储桶。通过使用8个线程,他得到了最佳的性能表现。原本需要3.6小时的任务,现在只需要1.25小时就能完成。
最后,作者对于这个解决方法的有效性表示肯定,并表示这个答案对他的帮助很大。
通过使用多线程并创建独立的会话,可以在Python中并行化AWS S3 API调用,提高上传大量文件的性能和效率。