Обзор проблемы: < /strong>
Я работаю над приложением Flink, которое позволяет пользователям динамически разрабатывать данные Dataflows. Двигатель Core построен вокруг этапов, где DataStream передается последовательно через эти этапы. Каждый этап обрабатывает поток и выводит его, который затем передается на следующий этап. Каждый маршрут имеет: < /p>
case (условие): конкретное значение для проверки поля в записях DataStream.
Трубопровод с этапами: каждый маршрут может иметь свой Собственная уникальная последовательность этапов для обработки данных, которые соответствуют его случаю. /> Убедитесь, что запись входит в маршрут только тогда, когда его состояние соответствует случаю этого маршрута. Сначала построен, и данные не обрабатываются до начала работы. В связи с этим: < /p>
Если логика маршрутизации размещена вне функции ProcessElement (), она выполняет перед обработкой каких -либо данных, вызывая преждевременные маршруты.
Если Логика маршрутизации помещается в функцию ProcessElement (), я могу правильно направить отдельные записи, но:
Я не могу передать полученную маршрутируемое DataStream на последующие этапы.
processelement () работает только на отдельных записях, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, так что, так Это не позволяет мне обрабатывать полные преобразования данных динамически динамически для каждого маршрута. Создание плана.
Каждый маршрут должен иметь свой собственный независимый конвейер этапов, который должен обрабатывать только данные, которые соответствуют условию маршрута. происходит только тогда, когда поступят данные. Данные обрабатываются, потому что Flink оценивает логику преобразования заранее из -за ленивой оценки.
В результате введены все маршруты, что не является желаемым поведением.
логика маршрутизации Inside ProcessElement (): < /p >
, перемещая логику маршрутизации внутри процессаэлизации (), я могу правильно определить, к какому маршруту принадлежит запись. или передавать полученный маршрутизированный DataStream к соответствующему конвейеру этапов. : Field
Маршруты:
route a: field = "case1"
route b: field = "case2"
ожидаемое поведение: < /p>
Для каждой записи в DataStream:
Если значение поля равна «case1», запись должна быть направлена на маршруту A и обрабатывается через его этапы.
Если значение поля равно "case2", Запись должна быть направлена на маршрут B и обрабатывается на его этапах.
Стремительная оценка: все конвейеры маршрутов (например, этапы для маршрута A и маршрута B) выполняются до появления любых данных.
Обработка с одним рекордом: размещение логики в ProcessElement () позволяет мне Обработка отдельных записей, но я не могу динамически передать полученную маршрутируемое DataStream к последующим стадийным трубопроводам. Значения во время выполнения.
Каждый маршрут должен иметь свой собственный конвейер этапов, который обрабатывает данные, соответствующие его случаю. Пример для разъяснения :
Мой код зависит от этапов, каждый этап может быть источником, преобразованием или раковиной, и каждый этап содержит функцию инициализации и выполнять Функция. Это настроено следующим образом: < /p>
Я исправил свой поток данных, чтобы он всегда содержал user_id, равный 1, поэтому теперь он всегда должен вводить маршрут A.
Но он всегда входит в оба маршрута.Это мой класс Switch Stage: https://docs.google.com/document/d/1pJU ... sp=sharing
все журналы распечатываются до поступления данных, и код входит в инициализацию и выполнение всех маршрутов, он не входит в RouteSplitterFunction сначала решает, какой маршрут следует выбрать.
[b] Обзор проблемы: < /strong> Я работаю над приложением Flink, которое позволяет пользователям динамически разрабатывать данные Dataflows. Двигатель Core построен вокруг этапов, где DataStream передается последовательно через эти этапы. Каждый этап обрабатывает поток и выводит его, который затем передается на следующий этап. Каждый маршрут имеет: < /p> case (условие): конкретное значение для проверки поля в записях DataStream. Трубопровод с этапами: каждый маршрут может иметь свой Собственная уникальная последовательность этапов для обработки данных, которые соответствуют его случаю. /> Убедитесь, что запись входит в маршрут только тогда, когда его состояние соответствует случаю этого маршрута. Сначала построен, и данные не обрабатываются до начала работы. В связи с этим: < /p> Если логика маршрутизации размещена вне функции ProcessElement (), она выполняет перед обработкой каких -либо данных, вызывая преждевременные маршруты. Если Логика маршрутизации помещается в функцию ProcessElement (), я могу правильно направить отдельные записи, но: Я не могу передать полученную маршрутируемое DataStream на последующие этапы. processelement () работает только на отдельных записях, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, поэтому, так что, так Это не позволяет мне обрабатывать полные преобразования данных динамически динамически для каждого маршрута. Создание плана. Каждый маршрут должен иметь свой собственный независимый конвейер этапов, который должен обрабатывать только данные, которые соответствуют условию маршрута. происходит только тогда, когда поступят данные. Данные обрабатываются, потому что Flink оценивает логику преобразования заранее из -за ленивой оценки. В результате введены все маршруты, что не является желаемым поведением. логика маршрутизации Inside ProcessElement (): < /p > , перемещая логику маршрутизации внутри процессаэлизации (), я могу правильно определить, к какому маршруту принадлежит запись. или передавать полученный маршрутизированный DataStream к соответствующему конвейеру этапов. : Field Маршруты: route a: field = "case1" route b: field = "case2" ожидаемое поведение: < /p> Для каждой записи в DataStream: Если значение поля равна «case1», запись должна быть направлена на маршруту A и обрабатывается через его этапы. Если значение поля равно "case2", Запись должна быть направлена на маршрут B и обрабатывается на его этапах. Стремительная оценка: все конвейеры маршрутов (например, этапы для маршрута A и маршрута B) выполняются до появления любых данных. Обработка с одним рекордом: размещение логики в ProcessElement () позволяет мне Обработка отдельных записей, но я не могу динамически передать полученную маршрутируемое DataStream к последующим стадийным трубопроводам. Значения во время выполнения. Каждый маршрут должен иметь свой собственный конвейер этапов, который обрабатывает данные, соответствующие его случаю. Пример для разъяснения [/b]: Мой код зависит от этапов, каждый этап может быть источником, преобразованием или раковиной, и каждый этап содержит функцию инициализации и выполнять Функция. Это настроено следующим образом: < /p> [code]stages=source:source1,rules:rules1,switch:switch1 switch1.type=switch switch1.routes=routeA,routeB switch1.field=user_id routeA.case=1 routeA.stages=source:source2,rules:rules2,target:target1 routeB.case=2 routeB.stages=source:source3,rules:rules3,target:target2 [/code] Я исправил свой поток данных, чтобы он всегда содержал user_id, равный 1, поэтому теперь он всегда должен вводить маршрут A. Но он всегда входит в оба маршрута.[b]Это мой класс Switch Stage[/b]: https://docs.google.com/document/d/1pJUulzAmcMnYfawZqH7RUHvsWDb7ahGcxigYoab-D5M/edit?usp=sharing все журналы распечатываются до поступления данных, и код входит в инициализацию и выполнение всех маршрутов, он не входит в [b]RouteSplitterFunction[/b] сначала решает, какой маршрут следует выбрать.
У меня есть приложение Java 21, которое использует зависимости Apache Flink (версия 1.20.0) для фильтрации потока Kafka.
Когда я пытаюсь выполнить свою программу, я получаю следующую ошибку: п>
INFO...
Я пытаюсь подключиться к шлюзу Apache Flink SQL с помощью DBEAVER через JDBC. Я читал, что мне нужен файл JAR Flink-Sql-Gateway-Client-JDBC (или аналогичный) в качестве драйвера JDBC.
Однако я не могу найти его в любом хранилище Maven или в...