使用Python读取/写入Parquet文件而不将其读入内存中

8 浏览
0 Comments

使用Python读取/写入Parquet文件而不将其读入内存中

我查看了我期望能满足我的需求的标准文档(Apache ArrowPandas),但是似乎无法弄清楚。

我最擅长使用Python,所以我希望使用Python,但这不是强制要求。

问题

我需要将Parquet文件从一个位置(URL)移动到另一个位置(Azure存储账户,在这种情况下使用Azure机器学习平台,但这与我的问题无关)。

这些文件太大,无法简单地执行pd.read_parquet("https://my-file-location.parquet"),因为这会将整个文件读入一个对象中。

期望

我认为一定有一种简单的方法可以创建一个文件对象,并逐行或逐列块地流式传输该对象。类似于以下代码:

import pyarrow.parquet as pq
with pq.open("https://my-file-location.parquet") as read_file_handle:
    with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
        for next_line in read_file_handle{
            write_file_handle.append(next_line)

我知道这可能会有些不同,因为Parquet主要是以列为基础进行访问的。也许有一种配置对象,可以指定感兴趣的列,或者可以抓取多少行数据块或类似的内容。

但关键的期望是有一种方式可以访问Parquet文件而无需将其全部加载到内存中。我该如何做到这一点?

顺便提一句,我确实尝试过直接使用Python的标准open函数,但我不确定如何在URL位置和字节流上使用open。如果可以仅通过open来实现而跳过任何特定于Parquet的内容,那也可以。

更新

一些评论建议使用类似于bash的脚本,例如这里。如果没有其他方法,我可以使用这种方法,但这并不理想,因为:

  • 我更希望将所有内容都保留在完整的语言SDK中,无论是Python、Go还是其他语言。如果解决方案转变为带有管道的bash脚本,那就需要一个外部调用,因为最终的解决方案不会完全使用bash、Powershell或任何脚本语言来编写。
  • 我真的希望发挥Parquet本身的一些优势。就像我在下面的评论中提到的,Parquet是列式存储。因此,如果我有一个包含11亿行和100列的“数据框”,但我只关心其中的3列,我希望只下载这3列,从而节省大量时间和一些金钱。
0
0 Comments

问题的原因:在处理Parquet文件时,通常需要将整个文件读入内存中进行操作,这可能会导致内存不足的问题,特别是当文件非常大时。因此,需要一种方法来在不读入内存的情况下,直接对Parquet文件进行读写操作。

解决方法:可以使用Python中的pyarrow库来实现对Parquet文件的读写操作。具体步骤如下:

1. 导入所需的库:首先,需要导入pyarrow库和pyarrow.parquet库。可以使用以下代码进行导入:

import pyarrow as pa
import pyarrow.parquet as pq

2. 读取Parquet文件:使用`pq.ParquetFile`函数读取要处理的Parquet文件。例如,可以使用以下代码读取名为'read.parquet'的Parquet文件:

parquet_file = pq.ParquetFile('read.parquet')

3. 定义新的Parquet模式:根据需要,定义一个新的Parquet模式,即定义Parquet文件中的列名和数据类型。可以使用以下代码定义一个新的模式:

new_schema = pa.schema([
    ('a', pa.int32()),
    ('b', pa.int32()),
    ('c', pa.int32()),
])

4. 创建Parquet写入器:使用`pq.ParquetWriter`函数创建一个新的Parquet写入器,指定要写入的文件名和定义的新模式。例如,可以使用以下代码创建一个名为'write.parquet'的Parquet写入器:

with pq.ParquetWriter('write.parquet', schema=new_schema) as writer:

5. 迭代处理Parquet文件:使用`parquet_file.iter_batches`函数迭代处理Parquet文件的批次数据。可以通过指定`batch_size`参数来控制每个批次的大小。例如,可以使用以下代码迭代处理批次数据:

for batch in parquet_file.iter_batches(batch_size=100000):

6. 转换数据并写入新文件:在每个批次中,将批次数据转换为Pandas数据帧,并对数据进行相应的处理。例如,可以使用以下代码将新的静态列添加到数据帧中,并将数据帧转换为记录批次,然后使用写入器将记录批次写入新的Parquet文件:

df = batch.to_pandas()
df['c'] = 9999999
transformed_batch = pa.RecordBatch.from_pandas(df, schema=new_schema)
writer.write_batch(transformed_batch)

通过以上步骤,可以实现对Parquet文件的读写操作,而无需将整个文件读入内存中。这种方法可以节省内存,并且适用于处理大型Parquet文件时的情况。

另外,对于将CSV文件转换为Parquet文件的情况,可以参考提供的链接:https://stackoverflow.com/a/74258957/6563567

0
0 Comments

读写Parquet文件时,如果不将其读入内存,这是可能的,但需要一些工作,因为除了是列式存储之外,Parquet还需要一个模式。

大致的工作流程是:

  1. 打开一个Parquet文件进行读取。
  2. 然后使用iter_batches逐步读取行的块(还可以传递要从文件中读取的特定列,以节省IO/CPU)。
  3. 然后可以进一步转换来自iter_batches的每个pa.RecordBatch。一旦完成第一个批处理的转换,可以获取其模式并创建一个新的ParquetWriter。
  4. 对于每个转换的批次,调用write_table。首先必须将其转换为pa.Table。
  5. 关闭文件。

Parquet需要随机访问,因此无法轻松地从URI流式传输(如果通过HTTP FSSpec打开文件,则pyarrow应该支持它),但我认为在写入时可能会被阻塞。

此外,请参阅stackoverflow.com/questions/63891231/...中的批处理大小对于内存管理非常重要。

是的,谢谢您!是的,我知道这可能有些棘手,并且需要一个模式。我发现在stackoverflow上,如果我不真的知道我要问什么,写较短的问题有时会更好。;) 不管怎样,iter_batches确实是我正在寻找的东西。我感到很愚蠢没有看到它。我会努力尝试将其放在适当的位置。

0
0 Comments

问题的出现原因是脚本在保存数据到Parquet文件时,指定了一个不存在的目录作为保存路径,导致脚本运行时崩溃。解决方法是将保存路径更改为存在的目录。

以下是解决该问题的代码示例:

import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile
# 创建一个随机的DataFrame并保存为Parquet文件
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})
df.to_parquet('./test.parquet')  # 将保存路径更改为存在的目录
# ****** 下面是 Kornfield 的回答 ******
# 1. 打开Parquet文件
batch = ParquetFile('./test.parquet')
# 2. 定义批处理数据的生成器
record = batch.iter_batches(
    batch_size=10,
    columns=['B', 'C'],
)
# 3. 返回pandas/numpy数据
print(next(record).to_pandas())  # pandas
print(next(record).to_pydict())  # native python dict

通过将保存路径更改为`./test.parquet`,可以解决脚本在保存Parquet文件时指定不存在目录的问题。这样脚本就可以正常运行并读取Parquet文件中的数据了。

0