Предположим, у меня есть цепочка из трех задач, которые я хотел бы выполнить. иметь возможность работать с несколькими работниками. Например, граф зависимостей для трех воркеров может выглядеть так:
/ TaskC -> TaskB -> TaskA
- TaskC -> TaskB - > TaskA
\ TaskC -> TaskB -> TaskA
и я мог бы написать
< pre class="lang-python Prettyprint-override">
Код: Выделить всё
class entry(luigi.Task):
in_dir = luigi.Parameter()
def requires(self):
for f in self.in_dir:
yield taskC(pass_through=f)
def run(self):
some logic using self.input().path
from each worker in the above yield
class taskA(luigi.Task):
in_file_A = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outA.txt')
def run(self):
some logic generating outA.txt
class taskB(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
return taskA(in_file_A=self.pass_through)
def run(self):
some logic using self.input().path [outA.txt]
and generating self.output().path [outB.txt]
class taskC(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outC.txt')
def requires(self):
return taskB(pass_through=self.pass_through)
def run(self):
some logic using self.input().path [outB.txt]
and generating self.output().path [outC.txt]
Код: Выделить всё
luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/
Код: Выделить всё
class taskB(luigi.Task):
in_file_B = luigi.Parameter() # if we already have the output of taskA
pass_through = luigi.Parameter() # if we require taskA
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
if self.pass_through:
return taskA(in_file_A=self.pass_through)
def run(self):
if self.input().path:
logic_input = self.input().path
else:
logic_input = self.in_file_B
some logic using 'logic_input'
and generating self.output().path [outB.txt]
Подробнее здесь: https://stackoverflow.com/questions/466 ... ay-through