Гибкий конвейер Луиджи и передача параметров на всем протяженииPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Гибкий конвейер Луиджи и передача параметров на всем протяжении

Сообщение Anonymous »

Недавно я реализовал конвейер Луиджи для обработки данных в одном из наших биоинформатических конвейеров. Однако в настройке этих задач есть кое-что фундаментальное, чего я не понимаю.

Предположим, у меня есть цепочка из трех задач, которые я хотел бы выполнить. иметь возможность работать с несколькими работниками. Например, граф зависимостей для трех воркеров может выглядеть так:

/ TaskC -> TaskB -> TaskA

- TaskC -> TaskB - > TaskA

\ TaskC -> TaskB -> TaskA

и я мог бы написать

< pre class="lang-python Prettyprint-override">

Код: Выделить всё

class entry(luigi.Task):

in_dir = luigi.Parameter()

def requires(self):
for f in self.in_dir:
yield taskC(pass_through=f)

def run(self):
some logic using self.input().path
from each worker in the above yield

class taskA(luigi.Task):

in_file_A = luigi.Parameter()

def output(self):
return luigi.LocalTarget('outA.txt')

def run(self):
some logic generating outA.txt

class taskB(luigi.Task):

pass_through = luigi.Parameter()

def output(self):
return luigi.LocalTarget('outB.txt')

def requires(self):
return taskA(in_file_A=self.pass_through)

def run(self):
some logic using self.input().path [outA.txt]
and generating self.output().path [outB.txt]

class taskC(luigi.Task):

pass_through = luigi.Parameter()

def output(self):
return luigi.LocalTarget('outC.txt')

def requires(self):
return taskB(pass_through=self.pass_through)

def run(self):
some logic using self.input().path [outB.txt]
and generating self.output().path [outC.txt]
Если мой код находится в конвейере.py, я мог бы запустить его с помощью:

Код: Выделить всё

luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/
Тот факт, что я отправляю параметр pass_through полностью в задачу TaskA, не кажется правильным подходом. Более того, если когда-нибудь в будущем у меня уже будут данные, сгенерированные (отдельно) задачей TaskA, задача B окажется недостаточно гибкой, чтобы справиться с этой ситуацией. Возможно, я мог бы написать:

Код: Выделить всё

class taskB(luigi.Task):

in_file_B = luigi.Parameter() # if we already have the output of taskA
pass_through = luigi.Parameter() # if we require taskA

def output(self):
return luigi.LocalTarget('outB.txt')

def requires(self):
if self.pass_through:
return taskA(in_file_A=self.pass_through)

def run(self):
if self.input().path:
logic_input = self.input().path
else:
logic_input = self.in_file_B

some logic using 'logic_input'
and generating self.output().path [outB.txt]
Мне хотелось бы знать, является ли это «подходящим» шаблоном проектирования для Луиджи или я совершенно не в теме.

Подробнее здесь: https://stackoverflow.com/questions/466 ... ay-through
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»