Код: Выделить всё
import pandas as pd
import dask.dataframe as dd
data = {'zipcode': [14253, 2584, 25479, 14253, 14253, 2584],
'property_type': [6, 9, 5, 5, 8, 6],
'median_dom': [8,7,8,7,3,8],
'median_sale_price': [10.1, 5.2, 3.3, 7.4, 5.5, 2.6],
'median_ppsf': [6, 9, 3, 2, 8, 6],
}
df = pd.DataFrame(data)
ddf = dd.from_pandas(df, npartitions = 1)
ddf.head()
Код: Выделить всё
zipcode property_type median_dom median_sale_price median_ppsf
0 14253 6 8 10.1 6
1 2584 9 7 5.2 9
2 25479 5 8 3.3 3
3 14253 5 7 7.4 2
4 14253 8 3 5.5 8
< /code>
и функция < /p>
#recalculate mom
def myfuntion():
col_list = ['median_dom','median_sale_price','median_ppsf']
for col in col_list:
new_col = col + '_mom'
ddf[new_col] = ddf[col].diff(1) / (ddf[col])
zipcode property_type median_dom median_sale_price median_ppsf median_dom_mom median_sale_price_mom median_ppsf_mom
0 14253 6 8 10.1 6 NaN NaN NaN
1 2584 9 7 5.2 9 -0.142857 -0.942308 0.333333
2 25479 5 8 3.3 3 0.125000 -0.575758 -2.000000
3 14253 5 7 7.4 2 -0.142857 0.554054 -0.500000
4 14253 8 3 5.5 8 -1.333333 -0.345455 0.750000
< /code>
Пока все хорошо, мой GOL - запустить эту функцию Groupby ZipCode и Property_type. Я делаю следующее: < /p>
ddf = ddf.groupby(['zipcode', 'property_type'])[['median_dom','median_sale_price','median_ppsf']].apply(myfuntion, meta=ddf)
< /code>
Получение и пустой DataFrame. < /p>
Следующие команды показывают следующую информацию < /p>
ddf._meta
zipcode property_type median_dom median_sale_price median_ppsf
< /code>
print(ddf.info())
Columns: 5 entries, zipcode to median_ppsf
dtypes: float64(1), int64(4)None
< /code>
print(ddf)
Dask DataFrame Structure:
zipcode property_type median_dom median_sale_price median_ppsf
npartitions=1
int64 int64 int64 float64 int64
... ... ... ... ...
Dask Name: groupbyapply, 3 expressions
Expr=GroupByApply(frame=df[['zipcode', 'property_type', 'median_dom', 'median_sale_price', 'median_ppsf']], observed=False, _slice=['median_dom', 'median_sale_price', 'median_ppsf'], func=, meta=df, args=(), kwargs={})
print(ddf.head(3))
< /code>
ddf
Dask DataFrame Structure:
zipcode property_type median_dom median_sale_price median_ppsf
npartitions=1
int64 int64 int64 float64 int64
... ... ... ... ...
Dask Name: groupbyapply, 3 expressions
< /code>
print(ddf.head(3)) or ddf
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
Cell In[9], line 1
----> 1 print(ddf.head(3))
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_collection.py:692, in FrameBase.head(self, n, npartitions, compute)
690 out = new_collection(expr.Head(self, n=n, npartitions=npartitions))
691 if compute:
--> 692 out = out.compute()
693 return out
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\base.py:373, in DaskMethodsMixin.compute(self, **kwargs)
349 def compute(self, **kwargs):
350 """Compute this dask collection
351
352 This turns a lazy Dask collection into its in-memory equivalent.
(...) 371 dask.compute
372 """
--> 373 (result,) = compute(self, traverse=False, **kwargs)
374 return result
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\base.py:679, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
661 with shorten_traceback():
662 # The high level optimize will have to be called client side (for now)
663 # The optimize can internally trigger already a computation
(...) 675 # change the graph submission to a handshake which introduces all sorts
676 # of concurrency control issues)
678 expr = expr.optimize()
--> 679 keys = list(flatten(expr.__dask_keys__()))
681 results = schedule(expr, keys, **kwargs)
683 return repack(results)
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\_expr.py:1207, in _ExprSequence.__dask_keys__(self)
1205 all_keys = []
1206 for op in self.operands:
-> 1207 all_keys.append(list(op.__dask_keys__()))
1208 return all_keys
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_expr.py:94, in Expr.__dask_keys__(self)
93 def __dask_keys__(self):
---> 94 return [(self._name, i) for i in range(self.npartitions)]
File ~\AppData\Local\Programs\Python\Python312\Lib\functools.py:995, in cached_property.__get__(self, instance, owner)
993 val = cache.get(self.attrname, _NOT_FOUND)
994 if val is _NOT_FOUND:
--> 995 val = self.func(instance)
996 try:
997 cache[self.attrname] = val
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_expr.py:2593, in BlockwiseHead.npartitions(self)
2591 @functools.cached_property
2592 def npartitions(self):
-> 2593 return len(self._divisions()) - 1
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_expr.py:2596, in BlockwiseHead._divisions(self)
2595 def _divisions(self):
-> 2596 return self.frame.divisions[: len(self._partitions) + 1]
File ~\AppData\Local\Programs\Python\Python312\Lib\functools.py:995, in cached_property.__get__(self, instance, owner)
993 val = cache.get(self.attrname, _NOT_FOUND)
994 if val is _NOT_FOUND:
--> 995 val = self.func(instance)
996 try:
997 cache[self.attrname] = val
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_expr.py:431, in Expr.divisions(self)
429 @functools.cached_property
430 def divisions(self):
--> 431 return tuple(self._divisions())
File ~\AppData\Local\Programs\Python\Python312\Lib\site-packages\dask\dataframe\dask_expr\_expr.py:595, in Blockwise._divisions(self)
593 for arg in dependencies:
594 if not self._broadcast_dep(arg):
--> 595 assert arg.divisions == dependencies[0].divisions
596 return dependencies[0].divisions
AssertionError:
< /code>
Any help will be appreciated.
Regards
Using a sample data before run an 9 million rows.
Подробнее здесь: https://stackoverflow.com/questions/796 ... -dataframe