Ветвление Apache FlinkJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Ветвление Apache Flink

Сообщение Anonymous »

Обзор проблемы: < /strong>
Я работаю над приложением Flink, которое позволяет пользователям динамически разрабатывать данные Dataflows. Двигатель Core построен вокруг этапов, где DataStream передается последовательно через эти этапы. Каждый этап обрабатывает поток и выводит его, который затем передается на следующий этап. Каждый маршрут имеет: < /p>
case (условие): конкретное значение для проверки поля в записях DataStream.
Трубопровод с этапами: каждый маршрут может иметь свой Собственная уникальная последовательность этапов для обработки данных, которые соответствуют его случаю. /> Убедитесь, что запись входит в маршрут только тогда, когда его состояние соответствует случаю этого маршрута. Сначала построен, и данные не обрабатываются до начала работы. В связи с этим: < /p>
Если логика маршрутизации размещена вне функции ProcessElement (), она выполняет перед обработкой каких -либо данных, вызывая преждевременные маршруты.
Если Логика маршрутизации помещается в функцию ProcessElement (), я могу правильно направить отдельные записи, но:
Я не могу передать полученную маршрутируемое DataStream на последующие этапы.
processelement () работает только на отдельных записях, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, так что, так Это не позволяет мне обрабатывать полные преобразования данных динамически динамически для каждого маршрута. Создание плана.
Каждый маршрут должен иметь свой собственный независимый конвейер этапов, который должен обрабатывать только данные, которые соответствуют условию маршрута. происходит только тогда, когда поступят данные. Данные обрабатываются, потому что Flink оценивает логику преобразования заранее из -за ленивой оценки.
В результате введены все маршруты, что не является желаемым поведением.
логика маршрутизации Inside ProcessElement (): < /p >
, перемещая логику маршрутизации внутри процессаэлизации (), я могу правильно определить, к какому маршруту принадлежит запись. или передавать полученный маршрутизированный DataStream к соответствующему конвейеру этапов. : Field
Маршруты:
route a: field = "case1"
route b: field = "case2"
ожидаемое поведение: < /p>
Для каждой записи в DataStream:
Если значение поля равна «case1», запись должна быть направлена ​​на маршруту A и обрабатывается через его этапы.
Если значение поля равно "case2", Запись должна быть направлена ​​на маршрут B и обрабатывается на его этапах.
Стремительная оценка: все конвейеры маршрутов (например, этапы для маршрута A и маршрута B) выполняются до появления любых данных.
Обработка с одним рекордом: размещение логики в ProcessElement () позволяет мне Обработка отдельных записей, но я не могу динамически передать полученную маршрутируемое DataStream к последующим стадийным трубопроводам. Значения во время выполнения.
Каждый маршрут должен иметь свой собственный конвейер этапов, который обрабатывает данные, соответствующие его случаю. Пример для разъяснения
:
Мой код зависит от этапов, каждый этап может быть источником, преобразованием или раковиной, и каждый этап содержит функцию инициализации и выполнять Функция. Это настроено следующим образом: < /p>

Код: Выделить всё

stages=source:source1,rules:rules1,switch:switch1
switch1.type=switch
switch1.routes=routeA,routeB
switch1.field=user_id
routeA.case=1
routeA.stages=source:source2,rules:rules2,target:target1
routeB.case=2
routeB.stages=source:source3,rules:rules3,target:target2
Я исправил свой поток данных, чтобы он всегда содержал user_id, равный 1, поэтому теперь он всегда должен вводить маршрут A.
Но он всегда входит в оба маршрута.Это мой класс Switch Stage:
https://docs.google.com/document/d/1pJU ... sp=sharing
все журналы распечатываются до поступления данных, и код входит в инициализацию и выполнение всех маршрутов, он не входит в RouteSplitterFunction сначала решает, какой маршрут следует выбрать.

Надеюсь, это прояснит мою проблему.

Подробнее здесь: https://stackoverflow.com/questions/793 ... -branching
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»