Код: Выделить всё
from typing import Dict
from pandas import DataFrame
from numpy import array
def my_iterator(*args) -> Dict[str, DataFrame]:
# Parse files with some processing already
yield {"key1": df1, "key2": df2}
def analyze(dfs: Dict[str, DataFrame], buffer: dict) -> array:
# Analysis on some columns of some DataFrame
pass
def custom_join(dfs: Dict[str, DataFrame], settings: array, buffer: dict) -> DataFrame:
# Custom join from multiple Dataframe to a single one.
pass
def write_to_database(key: str, df: DataFrame):
# Write df to a database using key.
pass
def main():
buffer_analysis = {}
buffer_join = {}
for dfs_dict in my_iterator(*args):
# Process data
analysis = analyze(dfs_dict, buffer_analysis)
res = custom_join(dfs_dict, analysis, buffer_join)
# Write result to disk.
write_to_database("key_res", res)
< /code>
У меня есть некоторые другие Worlfows, немного более сложные, здесь снова с итератором, дающим DICTS от DataFrames, и требует фильтрации, сегментированную кумулятивную агрегацию и заканчивая шагом запись. Data is ordered and analysis and custom join will keep a state of the data they have processed so far, taking it into account to process new data.
[*]But support of asyncio can be of interest (the write step may takes some time for instance, while the next loop can already be started)
[b]Identified solutions[/b]
Streamz действительно заинтересовался моим интересом, как легкий; Но, похоже, это не поддерживается. С этим я бы определил свой рабочий процесс с помощью функции Map ()
< /ul>
Код: Выделить всё
from streamz import Stream
# define 'analysis' and 'custom_join'
source = Stream()
analysis = source.map(analyze)
source.combine_latest(analysis).map(custom_join).sink(write_to_database)
def main():
buffer_analysis = {}
buffer_join = {}
for dfs_dict in my_iterator(*args):
# Yet have to workout how to use the buffers.
source.emit(dfs_dict)
< /code>
Dask, конечно, с dask.delayed < /code>; Но документация (и ответы в некоторых клетках) дает понять, что вы не можете применить dask.delayed
Дело в том, что все дайки полученных данных, которые даны, не могут быть материализованы в памяти в качестве списка, потому что это не вписывалось бы в ОЗУ.>
Подробнее здесь: https://stackoverflow.com/questions/796 ... cess-batch