Невозможно использовать данные с помощью последней версии соединителя Pyflink Kafka.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно использовать данные с помощью последней версии соединителя Pyflink Kafka.

Сообщение Anonymous »

Я пытаюсь прочитать данные из темы Kafka. Кафка настроена нормально. Теперь, когда я написал код, используя PyFlink, и независимо от того, добавляю ли я jar-файлы или нет, ошибка остается той же.
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema, Configuration

class SourceData(object):
def __init__(self, env):
self.env = env
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
self.env.set_parallelism(1)
self.config = Configuration()
self.config.set_string("pipeline.jars", "file:///../jars/flink-sql-connector-kafka-1.17.1.jar")
self.env.configure(self.config)

def get_data(self):
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
self.env \
.add_source(source) \
.print()
self.env.execute("source")

SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data()

Окружающая среда:
  • Flink 1.17.1
  • Java 11
  • Последняя версия клиента Kafka
  • Python 3.10.11
Ошибка:
TypeError: Could not found the Java class 'org.apache.flink.connector.kafka.source.KafkaSource.builder'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'

Я также пробовал без опции config и с использованием env.add_jars, но ошибка осталась прежней. Нужно ли мне настраивать что-нибудь еще?
Вторым вариантом, который я попробовал, было копирование jar в pyflink>lib внутри site-packages моей виртуальной среды. После этого я получаю следующую ошибку:
py4j.protocol.Py4JError: An error occurred while calling o12.addSource. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method addSource([class org.apache.flink.connector.kafka.source.KafkaSource, class java.lang.String, null]) does not exist


Подробнее здесь: https://stackoverflow.com/questions/769 ... -connector
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»