- Извлечение объектов с диска
- Извлечение содержимого из объектов диска.
- Удаление объектов диска.
Код: Выделить всё
# simplified for the sake of clarity/brevity
class RegisterTask(luigi.Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# custom class that manages targets across tasks
self.register = TargetRegister(self)
class OneDriveItemsExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(PickleTarget)
def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
items = asyncio.run(get_onedrive_items(data))
item_ids = [item.id for item in items]
self.output.write(item_ids)
class OneDriveContentExtractTask(RegisterTask)
def requires(self):
return OneDriveItemsExtractTask(**self.param_kwargs)
def output(self):
return self.register.generate_output(ParquetTarget)
def run(self):
item_ids = self.input().read()
content = asyncio.run(get_onedrive_content("my_connection", item_ids))
class DeleteOneDriveItemsTask(RegisterTask)
def requires(self):
return {
"item_ids": OneDriveItemsExtractTask(**self.param_kwargs),
"extract_compete": OneDriveContentExtractTask(**self.param_kwargs),
}
def output(self):
return self.register.generate_output(ParquetTarget)
def run(self):
item_ids = self.input()["item_ids"].read()
asyncio.run(delete_onedrive_items("my_connection", item_ids))
Выполнение задач вне планировщика luigi без тестовых приспособлений также работает должным образом.
Однако запуск рабочего процесса при интеграционном тестировании на реальном компьютере Конечная точка графа приводит к ошибке RuntimeError: цикл событий закрыт.
Вот обходной путь, который я придумал, но он неоптимален:
Код: Выделить всё
class OneDriveContentExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(ParquetTarget)
async def _async_operations(self, data:OneDriveData):
items = await get_onedrive_items(data)
data.items = []
content = await get_onedrive_content(data, items)
if self.delete_extracted_items and not content.empty():
delete_onedrive_items(data.connection, items)
return content
def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
df = asyncio.run(self._async_operations(data))
self.output().write(df)
Я был бы признателен за любую информацию о том, как правильно управлять несколькими асинхронными вызовами в этом контексте.
Я использую luigi 3.4.0.
Подробнее здесь: https://stackoverflow.com/questions/786 ... sync-calls