Pyspark 根据账户创建批次号列。

9 浏览
0 Comments

Pyspark 根据账户创建批次号列。

假设我有一个包含许多唯一账户值的pyspark dataframe,每个账户值都有一个唯一的条目数量,如下所示:

+-------------_+--------+--------+
| account| col1|  col2  | col3   |
+--------+-----+--------+--------+
| 325235 |   59|      -6|  625.64|
| 325235 |   23|    -282|  923.47|
| 325235 |   77|-1310.89| 3603.48|
| 245623 |  120|    1.53| 1985.63|
| 245623 |  106|     -12| 1985.06|
| 658567 |   84|     -12|  194.67|

我想要指定一个批次大小,并根据批次大小将多个账户分配到同一个批次中。假设我选择批次大小为2,那么输出应该是以下内容:

+-------------_+--------+--------+--------------+
| account| col1|  col2  | col3   | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 |   59|      -6|  625.64|             1|
| 325235 |   23|    -282|  923.47|             1|
| 325235 |   77|-1310.89| 3603.48|             1|
| 245623 |  120|    1.53| 1985.63|             1|
| 245623 |  106|     -12| 1985.06|             1|
| 658567 |   84|     -12|  194.67|             2|

然后,我可以根据batch_number列进行分组,每个批次中可以有多个账户。以下是我的工作代码,但由于我使用了toPandas(),所以速度太慢了。

# 获取源数据中唯一的账户
accounts = [row.account for row in source_data.select("account").distinct().collect()]
# 根据批次大小计算批次数量。最后一个批次的大小为余数
num_batches, remainder = divmod(len(accounts), batchsize)
# 创建批次dataframe,为每个账户分配一个批次号
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
# 在源数据中添加一个批次号为零的列,稍后将进行填充
source_data = source_data.withColumn("batch_number", lit(0))
# 将账户的批次号映射回源数据中
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
    source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
# 将映射后的pandas dataframe转换回spark dataframe
batched_df = sqlcontext.createDataFrame(source_data_p)

我希望能够摆脱toPandas()的调用,并在pyspark中进行映射。我看到了一些相关的帖子,比如这个:How to batch up items from a PySpark DataFrame,但这并不适用于我的代码流程,所以我将不得不重新编写整个项目才能实现这个。

0
0 Comments

在上述代码中,我们可以看到一个使用Pyspark创建批次编号列的示例。这个问题的出现是因为需要对数据进行批次处理,以便更好地组织和分析数据。

解决方法是使用Pyspark的StringIndexer函数来创建一个索引器,并将其应用于数据框。然后,使用Pyspark的floor函数对索引进行除法运算,并加1以得到批次编号列。最后,将结果显示出来。

通过上述代码,我们可以看到最终的结果数据框包含了批次编号列,该列基于账户列进行了分组。每个唯一的账户ID都被赋予了一个批次编号。

这个方法可以帮助我们更好地组织和管理数据,以便进行后续的分析和处理。它可以用于各种场景,例如按账户对数据进行分组、按时间窗口对数据进行分组等。

总结起来,通过使用Pyspark的StringIndexer函数和floor函数,我们可以很容易地创建一个批次编号列,以便更好地组织和分析数据。这个方法非常灵活,可以根据不同的需求进行定制和调整。

0