Код: Выделить всё
from typing import Dict
from pandas import DataFrame
from numpy import array
def my_iterator(*args) -> Dict[str, DataFrame]:
# Parse files with some processing already
yield {"key1": df1, "key2": df2}
def analyze(dfs: Dict[str, DataFrame], buffer: dict) -> array:
# Analysis on some columns of some DataFrame
pass
def custom_join(dfs: Dict[str, DataFrame], settings: array, buffer: dict) -> DataFrame:
# Custom join from multiple Dataframe to a single one.
pass
def write_to_database(key: str, df: DataFrame):
# Write df to a database using key.
pass
def main():
buffer_analysis = {}
buffer_join = {}
for dfs_dict in my_iterator(*args):
# Process data
analysis = analyze(dfs_dict, buffer_analysis)
res = custom_join(dfs_dict, analysis, buffer_join)
# Write result to disk.
write_to_database("key_res", res)
< /code>
У меня есть некоторые другие Worlfows, немного более сложные, здесь снова с итератором, дающим DICTS от DataFrames, и требует фильтрации, сегментированную кумулятивную агрегацию и заканчивая шагом запись. Данные упорядочены, и анализ и пользовательское соединение сохранит состояние данных, которые они обработали до сих пор, принимая во внимание их для обработки новых данных.
[*] Но поддержка асинсио может быть интересной (например, шаг записи может занять некоторое время, в то время как следующий цикл уже может быть запущен)
round> round) Streamz действительно заинтересовался моим интересом, как легкий; Но, похоже, это не поддерживается. С этим я бы определил свой рабочий процесс с помощью функции Map ()
< /ul>
Код: Выделить всё
from streamz import Stream
# define 'analysis' and 'custom_join'
source = Stream()
analysis = source.map(analyze)
source.combine_latest(analysis).map(custom_join).sink(write_to_database)
def main():
buffer_analysis = {}
buffer_join = {}
for dfs_dict in my_iterator(*args):
# Yet have to workout how to use the buffers.
source.emit(dfs_dict)
< /code>
Dask, конечно, с dask.delayed < /code>; Но документация (и ответы в некоторых клетках) дает понять, что вы не можете применить dask.delayed
Дело в том, что все дайки полученных данных, которые даны, не могут быть материализованы в памяти в качестве списка, потому что это не вписывалось бы в ОЗУ.>
Подробнее здесь: https://stackoverflow.com/questions/796 ... cess-batch