使用多进程池读取CSV文件比使用CSV阅读器花费的时间更长。

10 浏览
0 Comments

使用多进程池读取CSV文件比使用CSV阅读器花费的时间更长。

根据我们一个客户的要求,我需要开发一个应用程序,能够处理巨大的CSV文件。文件的大小可能在10MB到2GB之间。

根据文件大小,模块决定是使用Multiprocessing pool还是使用普通的CSV reader来读取文件。

但是观察结果显示,对于文件大小为100MB的情况下,使用多进程比使用普通的CSV读取花费更多时间。

这种行为正确吗?或者我做错了什么吗?

以下是我的代码:

def set_file_processing_mode(self, fpath):
   """ """
   fsize = self.get_file_size(fpath)
   if fsize > FILE_SIZE_200MB:
      self.read_in_async_mode = True
   else:
      self.read_in_async_mode = False
def read_line_by_line(self, filepath):
    """按行读取CSV"""
    with open(filepath, 'rb') as csvin:
        csvin = csv.reader(csvin, delimiter=',')
        for row in iter(csvin):
          yield row
def read_huge_file(self, filepath):
    """分块读取文件"""
    pool = mp.Pool(1)
    for chunk_number in range(self.chunks): #self.chunks = 20
        proc = pool.apply_async(read_chunk_by_chunk, 
                        args=[filepath, self.chunks, chunk_number])
        reader = proc.get()
        yield reader
    pool.close()
    pool.join()
def iterate_chunks(self, filepath):
    """逐块读取大文件行"""
    for chunklist in self.read_huge_file(filepath):
        for row in chunklist:
            yield row
@timeit #-- 自定义装饰器
def read_csv_rows(self, filepath):
    """读取CSV行并进行处理"""
    if self.read_in_async_mode:
        print("以异步模式读取")
        for row in self.iterate_chunks(filepath):
            self.process(row)
    else:
        print("以同步模式读取")
        for row in self.read_line_by_line(filepath):
            self.process(row)
def process(self, formatted_row):
    """仅打印行"""
    self.log(formatted_row)
def read_chunk_by_chunk(filename, number_of_blocks, block):
  '''
  一个将文件分块并迭代一个块中的行的生成器。
  '''
  results = []
  assert 0 <= block and block < number_of_blocks
  assert 0 < number_of_blocks
  with open(filename) as fp :
    fp.seek(0,2)
    file_size = fp.tell()
    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks
    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()
    while fp.tell() < end:
        results.append(fp.readline())
  return results
if __name__ == '__main__':
    classobj.read_csv_rows(sys.argv[1])    

以下是一个测试:

$ python csv_utils.py "input.csv"
以异步模式读取
在3.75秒内完成
$ python csv_utils.py "input.csv"
以同步模式读取
在0.96秒内完成

问题是:为什么异步模式花费的时间更长?

注:为了避免代码复杂化,删除了不必要的函数/行。

0
0 Comments

问题的出现的原因是在使用多进程池处理CSV文件时耗时较长。解决方法是将文件分割成多个部分,然后使用多进程池进行处理。通过这种方式,可以并行处理多个文件,从而提高处理速度。

在这个问题中,首先需要确定哪些部分可以并行处理,然后按顺序读取文件并将其分割成多个部分。接下来,使用多进程来并行处理这些部分。

具体的代码示例如下:

import multiprocessing as mp
def process(rows):
    # 进行处理
    ...
    return result
if __name__ == '__main__':
    pool = mp.Pool(N) # N > 1
    chunks = get_chunks(...)  # 获取文件的分块
    for rows in chunks:
       result += pool.apply_async(process, rows)
    pool.close()
    pool.join() 

需要注意的是,这里没有定义`get_chunks`函数,因为有多种方法可以实现文件的分块,可以根据自己的需求选择适合的方法。

总结起来,根据处理每个文件的需求,顺序处理可能是最快的方法,因为处理部分无法从并行中获得很大的收益。但是如果有多个可以并行处理的文件,使用多进程是一个很好的选择。可以使用上述代码示例将文件分块并在多进程中处理。这样可以提高处理速度。

希望以上内容对你有所帮助。

0