Pyarrow 在将 CSV 转换为 Parquet 时查找错误行。

4 浏览
0 Comments

Pyarrow 在将 CSV 转换为 Parquet 时查找错误行。

在将一个大型csv文件转换为parquet时,我遇到了CSV列#10:转换为字符串的CSV转换错误:无效的UTF8数据。从错误信息来看,似乎由于存在无效的utf-8字符,无法将适当的列数据转换为String类型。

也许通过在pyarrow.ReadOptions中使用适当的编码方案可以修复此问题。但我想知道导致错误的那一行。

由于这是一个有数百万行的大文件,我无法确定导致错误的那一行。

在pyarrow的read_csv函数中是否有任何选项可以报告错误行?或者如果我们可以将该特定单元格替换为NANNULL,那将更好。

0
0 Comments

问题:如何在将CSV转换为Parquet时找到错误行,并解决该问题?

原因:在将CSV文件转换为Parquet文件时,如果CSV文件包含非UTF-8编码的字符,会导致在pyarrow和其他库中解码输入为Unicode字符串时出现问题。

解决方法:可以采用以下方法找到错误行,并在读取CSV文件时进行预处理:

1. 在命令行中使用grep命令找到错误行:grep -axv '.*' file.csv

2. 使用以下代码对输入进行预处理,将非UTF-8编码字符删除:

   import io
   import pyarrow.csv as pv
   import pathlib
   
   class UnicodeErrorIgnorerIO(io.IOBase):
       """Simple wrapper for a BytesIO that removes non-UTF8 input.
       If a file contains non-UTF8 input, it causes problems in pyarrow and other libraries
       that try to decode the input to unicode strings. This just removes the offending bytes.
       >>> io = io.BytesIO(b"INT\xbfL LICENSING INDUSTRY MERCH ASSOC")
       >>> io = UnicodeErrorIgnorerIO(io)
       >>> io.read()
       'INTL LICENSING INDUSTRY MERCH ASSOC'
       """
       def __init__(self, file: io.BytesIO) -> None:
           self.file = file
       def read(self, n=-1):
           return self.file.read(n).decode("utf-8", "ignore").encode("utf-8")
       def readline(self, n=-1):
           return self.file.readline(n).decode("utf-8", "ignore").encode("utf-8")
       def readable(self):
           return True
   
   def read_csv(path: pathlib.Path):
       with open(path, "rb") as f:
           f = UnicodeErrorIgnorerIO(f)
           return pv.read_csv(f)
   

以上代码定义了一个名为`UnicodeErrorIgnorerIO`的类,该类是对`BytesIO`的简单包装,用于删除非UTF-8编码的输入。`read_csv`函数使用`UnicodeErrorIgnorerIO`对输入进行预处理,然后使用`pv.read_csv`函数读取CSV文件。

0
0 Comments

Pyarrow在将CSV转换为Parquet时,没有提供报告行号或失败行的选项。目前正在进行一些改进错误处理的工作,但即使这项工作也不会显示解码错误的行号。建议创建一个JIRA问题。

如上所述,可以将列指定为二进制,然后在内存中手动检查它。可以使用cast计算函数将二进制转换为字符串,这将执行UTF8验证,但很遗憾,目前它也不会报告失败的索引。

作为解决方法,可以使用pandas的CSV解析器,它应该给出失败的字节偏移量:

>>> import pandas
>>> pandas.read_csv("/tmp/blah.csv")
Traceback (most recent call last):
  ... # 省略部分内容
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 29: invalid start byte

以上是解决此问题的方法。

0