Луиджи Wowkflows с несколькими асинхронными вызовами?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Луиджи Wowkflows с несколькими асинхронными вызовами?

Сообщение Anonymous »

Я пытаюсь реализовать рабочий процесс luigi, который взаимодействует с OneDrive с помощью msgraph, в три отдельных этапа:
  • Извлечение объектов с диска
  • Извлечение содержимого из объектов диска.
  • Удаление объектов диска.

Код: Выделить всё

# simplified for the sake of clarity/brevity

class RegisterTask(luigi.Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# custom class that manages targets across tasks
self.register = TargetRegister(self)

class OneDriveItemsExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(PickleTarget)

def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
items = asyncio.run(get_onedrive_items(data))
item_ids = [item.id for item in items]
self.output.write(item_ids)

class OneDriveContentExtractTask(RegisterTask)
def requires(self):
return OneDriveItemsExtractTask(**self.param_kwargs)

def output(self):
return self.register.generate_output(ParquetTarget)

def run(self):
item_ids = self.input().read()
content = asyncio.run(get_onedrive_content("my_connection", item_ids))

class DeleteOneDriveItemsTask(RegisterTask)
def requires(self):
return {
"item_ids": OneDriveItemsExtractTask(**self.param_kwargs),
"extract_compete": OneDriveContentExtractTask(**self.param_kwargs),
}

def output(self):
return self.register.generate_output(ParquetTarget)

def run(self):
item_ids = self.input()["item_ids"].read()
asyncio.run(delete_onedrive_items("my_connection", item_ids))
Задачи и рабочий процесс проходят модульные тесты, если они изолированы от зависимости msgraph с помощью тестовых приспособлений, как при отправке в планировщик luigi, так и при вызове метода run() после создания экземпляра. .
Выполнение задач вне планировщика luigi без тестовых приспособлений также работает должным образом.
Однако запуск рабочего процесса при интеграционном тестировании на реальном компьютере Конечная точка графа приводит к ошибке RuntimeError: цикл событий закрыт.
Вот обходной путь, который я придумал, но он неоптимален:

Код: Выделить всё

class OneDriveContentExtractTask(RegisterTask)
def output(self):
return self.register.generate_output(ParquetTarget)

async def _async_operations(self, data:OneDriveData):
items = await get_onedrive_items(data)
data.items = []
content = await get_onedrive_content(data, items)

if self.delete_extracted_items and not content.empty():
delete_onedrive_items(data.connection, items)

return content

def run(self):
data = OneDriveData(
"my_connection",
"my_parent_directory",
["item1.csv", "item2.csv"],
"csv"
)
df = asyncio.run(self._async_operations(data))

self.output().write(df)
Это работает. Кажется, что ломает Луиджи, так это наличие нескольких асинхронных циклов в одном рабочем процессе.
Я был бы признателен за любую информацию о том, как правильно управлять несколькими асинхронными вызовами в этом контексте.
Я использую luigi 3.4.0.

Подробнее здесь: https://stackoverflow.com/questions/786 ... sync-calls
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Луиджи Wowkflows с несколькими асинхронными вызовами?
    Anonymous » » в форуме Python
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous
  • Phpunit: Как издеваться над несколькими вызовами методов с несколькими аргументами?
    Anonymous » » в форуме Php
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Гибкий конвейер Луиджи и передача параметров на всем протяжении
    Anonymous » » в форуме Python
    0 Ответы
    25 Просмотры
    Последнее сообщение Anonymous
  • Тип аннотации для параметров задач Луиджи
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • C# UDP-сервер с несколькими асинхронными клиентами | SocketException при отключении клиента
    Anonymous » » в форуме C#
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»