Pyspark 根据账户创建批次号列。
Pyspark 根据账户创建批次号列。
假设我有一个包含许多唯一账户值的pyspark dataframe,每个账户值都有一个唯一的条目数量,如下所示:
+-------------_+--------+--------+ | account| col1| col2 | col3 | +--------+-----+--------+--------+ | 325235 | 59| -6| 625.64| | 325235 | 23| -282| 923.47| | 325235 | 77|-1310.89| 3603.48| | 245623 | 120| 1.53| 1985.63| | 245623 | 106| -12| 1985.06| | 658567 | 84| -12| 194.67|
我想要指定一个批次大小,并根据批次大小将多个账户分配到同一个批次中。假设我选择批次大小为2,那么输出应该是以下内容:
+-------------_+--------+--------+--------------+ | account| col1| col2 | col3 | batch_number | +--------+-----+--------+--------+--------------+ | 325235 | 59| -6| 625.64| 1| | 325235 | 23| -282| 923.47| 1| | 325235 | 77|-1310.89| 3603.48| 1| | 245623 | 120| 1.53| 1985.63| 1| | 245623 | 106| -12| 1985.06| 1| | 658567 | 84| -12| 194.67| 2|
然后,我可以根据batch_number
列进行分组,每个批次中可以有多个账户。以下是我的工作代码,但由于我使用了toPandas(),所以速度太慢了。
# 获取源数据中唯一的账户 accounts = [row.account for row in source_data.select("account").distinct().collect()] # 根据批次大小计算批次数量。最后一个批次的大小为余数 num_batches, remainder = divmod(len(accounts), batchsize) # 创建批次dataframe,为每个账户分配一个批次号 batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)] batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account") # 在源数据中添加一个批次号为零的列,稍后将进行填充 source_data = source_data.withColumn("batch_number", lit(0)) # 将账户的批次号映射回源数据中 source_data_p = source_data.toPandas() for ind in source_data_p.index: source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"] # 将映射后的pandas dataframe转换回spark dataframe batched_df = sqlcontext.createDataFrame(source_data_p)
我希望能够摆脱toPandas()的调用,并在pyspark中进行映射。我看到了一些相关的帖子,比如这个:How to batch up items from a PySpark DataFrame,但这并不适用于我的代码流程,所以我将不得不重新编写整个项目才能实现这个。
在上述代码中,我们可以看到一个使用Pyspark创建批次编号列的示例。这个问题的出现是因为需要对数据进行批次处理,以便更好地组织和分析数据。
解决方法是使用Pyspark的StringIndexer函数来创建一个索引器,并将其应用于数据框。然后,使用Pyspark的floor函数对索引进行除法运算,并加1以得到批次编号列。最后,将结果显示出来。
通过上述代码,我们可以看到最终的结果数据框包含了批次编号列,该列基于账户列进行了分组。每个唯一的账户ID都被赋予了一个批次编号。
这个方法可以帮助我们更好地组织和管理数据,以便进行后续的分析和处理。它可以用于各种场景,例如按账户对数据进行分组、按时间窗口对数据进行分组等。
总结起来,通过使用Pyspark的StringIndexer函数和floor函数,我们可以很容易地创建一个批次编号列,以便更好地组织和分析数据。这个方法非常灵活,可以根据不同的需求进行定制和调整。