使用多进程池读取CSV文件比使用CSV阅读器花费的时间更长。
使用多进程池读取CSV文件比使用CSV阅读器花费的时间更长。
根据我们一个客户的要求,我需要开发一个应用程序,能够处理巨大的CSV文件。文件的大小可能在10MB到2GB之间。
根据文件大小,模块决定是使用Multiprocessing pool
还是使用普通的CSV reader
来读取文件。
但是观察结果显示,对于文件大小为100MB的情况下,使用多进程
比使用普通的CSV读取
花费更多时间。
这种行为正确吗?或者我做错了什么吗?
以下是我的代码:
def set_file_processing_mode(self, fpath): """ """ fsize = self.get_file_size(fpath) if fsize > FILE_SIZE_200MB: self.read_in_async_mode = True else: self.read_in_async_mode = False def read_line_by_line(self, filepath): """按行读取CSV""" with open(filepath, 'rb') as csvin: csvin = csv.reader(csvin, delimiter=',') for row in iter(csvin): yield row def read_huge_file(self, filepath): """分块读取文件""" pool = mp.Pool(1) for chunk_number in range(self.chunks): #self.chunks = 20 proc = pool.apply_async(read_chunk_by_chunk, args=[filepath, self.chunks, chunk_number]) reader = proc.get() yield reader pool.close() pool.join() def iterate_chunks(self, filepath): """逐块读取大文件行""" for chunklist in self.read_huge_file(filepath): for row in chunklist: yield row @timeit #-- 自定义装饰器 def read_csv_rows(self, filepath): """读取CSV行并进行处理""" if self.read_in_async_mode: print("以异步模式读取") for row in self.iterate_chunks(filepath): self.process(row) else: print("以同步模式读取") for row in self.read_line_by_line(filepath): self.process(row) def process(self, formatted_row): """仅打印行""" self.log(formatted_row) def read_chunk_by_chunk(filename, number_of_blocks, block): ''' 一个将文件分块并迭代一个块中的行的生成器。 ''' results = [] assert 0 <= block and block < number_of_blocks assert 0 < number_of_blocks with open(filename) as fp : fp.seek(0,2) file_size = fp.tell() ini = file_size * block / number_of_blocks end = file_size * (1 + block) / number_of_blocks if ini <= 0: fp.seek(0) else: fp.seek(ini-1) fp.readline() while fp.tell() < end: results.append(fp.readline()) return results if __name__ == '__main__': classobj.read_csv_rows(sys.argv[1])
以下是一个测试:
$ python csv_utils.py "input.csv" 以异步模式读取 在3.75秒内完成 $ python csv_utils.py "input.csv" 以同步模式读取 在0.96秒内完成
问题是:为什么异步模式花费的时间更长?
注:为了避免代码复杂化,删除了不必要的函数/行。
问题的出现的原因是在使用多进程池处理CSV文件时耗时较长。解决方法是将文件分割成多个部分,然后使用多进程池进行处理。通过这种方式,可以并行处理多个文件,从而提高处理速度。
在这个问题中,首先需要确定哪些部分可以并行处理,然后按顺序读取文件并将其分割成多个部分。接下来,使用多进程来并行处理这些部分。
具体的代码示例如下:
import multiprocessing as mp def process(rows): # 进行处理 ... return result if __name__ == '__main__': pool = mp.Pool(N) # N > 1 chunks = get_chunks(...) # 获取文件的分块 for rows in chunks: result += pool.apply_async(process, rows) pool.close() pool.join()
需要注意的是,这里没有定义`get_chunks`函数,因为有多种方法可以实现文件的分块,可以根据自己的需求选择适合的方法。
总结起来,根据处理每个文件的需求,顺序处理可能是最快的方法,因为处理部分无法从并行中获得很大的收益。但是如果有多个可以并行处理的文件,使用多进程是一个很好的选择。可以使用上述代码示例将文件分块并在多进程中处理。这样可以提高处理速度。
希望以上内容对你有所帮助。