将Pyspark df中超过5000万条数据写入PostgresSQL的最高效方法。

9 浏览
0 Comments

将Pyspark df中超过5000万条数据写入PostgresSQL的最高效方法。

如何将数百万条记录(比如5000万条)从Spark DataFrame高效地插入到PostgreSQL表中?我之前用过Spark将数据插入到MSSQL中,使用了批量复制和批量大小的选项,非常成功。在PostgreSQL中是否有类似的方法呢?以下是我尝试的代码和运行时间:\n

def inserter():
    start = timer()
    sql_res.write.format("jdbc").option("numPartitions","5").option("batchsize","200000")\
    .option("url", "jdbc:postgresql://xyz.com:5435/abc_db") \
    .option("dbtable", "public.full_load").option("user", "root").option("password", "password").save()
    end = timer()
    print(timedelta(seconds=end-start))
inserter()

\n所以我对1000万条记录采用了上述方法,使用了5个并行连接(根据`numPartitions`指定),批量大小为20万条。整个过程耗时为14分05.760926秒。是否有其他更高效的方法来减少时间?我应该使用什么批量大小才更高效?增加批量大小是否可以更快地完成任务?或者开启多个连接(大于5个)是否可以加快过程?对于1000万条记录来说,平均14分钟的时间已经不错了,但希望有经验的人能帮忙解答这个问题。

0
0 Comments

问题的原因是在将Pyspark数据框中的超过5000万行数据写入PostgresSQL时,需要找到最佳高效的方法。为了解决这个问题,可以优化Spark与PostgresSQL之间的通信,特别是从Spark流向PostgresSQL的数据流。需要注意的是,不要忘记Spark的一侧。如果分区数量与PostgreSQL支持的最大连接数相比太高,那么执行mapPartitions是没有意义的。如果有太多的分区,并且为每个分区打开一个连接,可能会出现以下错误org.postgresql.util.PSQLException: FATAL: sorry, too many clients already

为了调整插入过程,可以按照以下步骤进行处理:

  • 记住分区的数量很重要。检查分区的数量,然后根据您想要拥有的并行连接数量进行调整。您可能希望每个分区有一个连接,因此建议检查coalesce,如这里所述。
  • 检查您的PostgreSQL实例支持的最大连接数,并且您希望增加该数量
  • 对于将数据插入PostgreSQL,建议使用COPY命令这里还有一个更详细的关于如何加速postgreSQL插入的答案。

最后,没有万能的解决方案来完成这项工作。您可以使用我上面提到的所有技巧,但实际效果将取决于您的数据和使用情况。

0