InvalidIndexError dask dataframe
InvalidIndexError dask dataframe
我正在学习dask,并且会时不时地遇到这个错误:
InvalidIndexError: Reindexing only valid with uniquely valued Index objects
我有一个经过预处理的dask df,可以对其进行许多操作,但是有些简单的操作会引发此错误。
我尝试重置索引,但似乎没有帮助。
问题:有没有一个高级的答案可以解释可能出错的原因?我可以在哪里阅读相关内容?为什么通常会出现这个问题?谷歌搜索没有帮助。
例如,这里有一个奇怪的例子:
我在每个操作后测试df的统计信息,以便找出任何可疑的地方。
df = load_data() df.shape[0].compute(), df.npartitions #ok df = prepare_target(df) df.shape[0].compute(), df.npartitions #ok x_train, x_test, y_train, y_test = dask_tts(df.drop('target', 1), df['target'], random_state=1) #ok x_train['target'] = y_train x_test['target'] = y_test #ok x_train.shape[0].compute(), x_train.npartitions x_test.shape[0].compute(), x_test.npartitions #ok x_train.index.nunique().compute() x_test.index.nunique().compute() #ok train, smooth_dict = smoothed_likelyhood(x_train) # returns df and dict train.shape[0].compute() #ok test, _ = smoothed_likelyhood(x_test) test.shape[0].compute() #ok train.index.nunique().compute() #ok test.index.nunique().compute() # after this line - error # InvalidIndexError: Reindexing only valid with uniquely valued Index objects
需要指出的是,只有test会引发错误。
在这里,我尝试复现它,但它按预期工作:
import numpy as np import pandas as pd import dask.dataframe as dd from dask_ml.model_selection import train_test_split def smoothed_likelyhood(df, alpha=1): # works with dask df global_mean = df['target'].mean() smooth_dict = {} final_df = df.copy() for c in [c for c in df.columns if c!='target']: n_rows = df[c].value_counts() all_means = df.groupby(by=c)['target'].mean() temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha)) final_df[c] = df[c].map(temp_result) smooth_dict[c] = temp_result.compute().to_dict() return final_df, smooth_dict # TOY EXAMPLE test = pd.DataFrame({'a':['mos', 'mos', 'london', 'dubai', 'ny', 'mos', 'london', 'dubai', 'shanghai', 'dubai', 'mos', 'london', 'dubai', 'dubai'], 'b':['big', 'big', 'big', 'med', 'med', 'med', 'small', 'small', 'small', 'small', 'big', 'med', 'small', 'med'], 'target':[1,0,0,1,0,1,1,0,1,1,1,0,0,0]}) df = dd.from_pandas(test, npartitions=2) # ----------------------------------------------- print(f'npartitions: {df.npartitions}') x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], test_size=0.3, shuffle=True) x_train['target'] = y_train x_test['target'] = y_test print(x_train.shape[0].compute(), x_train.index.nunique().compute()) print(x_test.shape[0].compute(), x_test.index.nunique().compute()) train, smooth_dict = smoothed_likelyhood(x_train) test, _ = smoothed_likelyhood(x_test) print(train.shape[0].compute(), train.index.nunique().compute()) print(test.shape[0].compute(), test.index.nunique().compute()) # train.compute() print(train['target'].mean().compute()) print(test['target'].mean().compute())
这部分可以正常工作,但是当我尝试使用真实数据时:
%%time df = load_data(stage='prep_2', frac=config.FRAC, verbose=False) # loading many parquet files df = prepare_target(df) # some small changes to df # ----------------------------------------------- print(f'npartitions: {df.npartitions}') x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], random_state=1) x_train['target'] = y_train x_test['target'] = y_test print(x_train.shape[0].compute(), x_train.index.nunique().compute()) print(x_test.shape[0].compute(), x_test.index.nunique().compute()) train, smooth_dict = smoothed_likelyhood(x_train) test, _ = smoothed_likelyhood(x_test) print(x_train.shape[0].compute(), x_train.index.nunique().compute()) print(x_test.shape[0].compute(), x_test.index.nunique().compute())
这些是打印出的结果:
npartitions: 10 1476758 164300 164018 106750 1476758 164300 164018 106750
其中任何一个都会引发上述的索引错误:
train['target'].mean().compute() test['target'].mean().compute()
如果你有任何想法,我可能会进一步调查,只是我不知道应该从哪里开始。
谢谢。
问题出现的原因是在smoothed_likelihood函数中,使用了dask series的temp_result进行映射操作,导致出现了无效的索引错误。通常情况下,使用pandas时,如果想要将一些值替换成其他值,可以使用map函数,但是在dask中,不能直接将temp_result转换为字典,只能在计算之后才能转换。作者尝试了一些方法,发现简单的df.map(series)操作奇迹般地生效了,但是引入了无效索引的问题。
解决方法是使用预先计算好的字典smooth_dict来进行映射,而不是使用series(temp_result)。虽然在dask中提前计算可能不是最优解,但是作者仍然需要结果,并且据他所知,此处只计算了一次。
下面是一个可行的函数实现:
def smoothed_likelyhood(df, alpha=1): """ Discription: preprocess based on mean values of positive events in each category Args: df: [df] - df to encode alpha: [int/float] - regularization param. We can find it with CV Returns: encoded df, dict to encode user during prediction """ global_mean = df['target'].mean() smooth_dict = {} for c in [c for c in df.columns if c!='target']: n_rows = df[c].value_counts() all_means = df.groupby(by=c)['target'].mean() temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha)) smooth_dict[c] = temp_result.compute().to_dict() df[c] = df[c].map(smooth_dict[c]) return df, smooth_dict
然而,这个函数在处理800k * 90的数据框时需要花费25分钟,而这只是总数据量的5%。如果有人能提供在dask中加速这个函数的建议,将不胜感激。