Java.lang.nosuchmethodexception в Python Flink jdbcPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Java.lang.nosuchmethodexception в Python Flink jdbc

Сообщение Anonymous »

Я пытаюсь прочитать данные из Kafka и погрузить данные в PostgreSQL с Flink StreamingAPI с использованием Python. Я использую Flink версию 1.19.1 и PostgreSQL 14.13. Вот мой код < /p>
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaSource
from pyflink.common.typeinfo import Types
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext
from pyflink.common import WatermarkStrategy
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.state import ValueStateDescriptor

class CountWindowAverage(FlatMapFunction):
def __init__(self):
self.sum = None

def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor(
"average",
Types.TUPLE([Types.INT(), Types.INT()])
)
self.sum = runtime_context.get_state(descriptor)

def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)

current_sum = (current_sum[0] + 1, current_sum[1] + value[3])

self.sum.update(current_sum)

if current_sum[0] >= 2:
self.sum.clear()
yield (value[0], current_sum[0], current_sum[1])

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_python_executable("/home/mhtuan/anaconda3/envs/flink-env/bin/python")
env.add_jars("file:///home/mhtuan/work/flink_kafka/flink_sql_kafka.jar", "file:///home/mhtuan/work/flink_kafka/flink-connector-jdbc-3.2.0-1.19.jar", "file:///home/mhtuan/work/flink_kafka/postgresql-42.7.3.jar")

deserialization_schema = JsonRowDeserializationSchema.builder().type_info(
type_info=Types.ROW_NAMED(
["id", "hour_key", "name", "age"], [Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])).build()

source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9091,localhost:9092,localhost:9093") \
.set_topics("test-flink-3") \
.set_group_id("flink-1") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(deserialization_schema) \
.build()

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds = ds.key_by(lambda record: record["id"], key_type=Types.INT()).flat_map(CountWindowAverage())

ds.add_sink(JdbcSink.sink(
"insert into flink.flatmap_age (id, count, sum) values (?, ?, ?)",
Types.ROW([Types.INT(), Types.INT(), Types.INT()]),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url('jdbc:postgresql://localhost:5432/postgres')
.with_driver_name('org.postgresql.Driver')
.with_user_name('postgres')
.with_password('postgres')
.build(),
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
))

env.execute("kafka_sink_example")
< /code>
Все работает нормально
При запуске кода я получаю следующую ошибку: < /p>
Traceback (most recent call last):
File "/home/mhtuan/work/flink_kafka/./src/main.py", line 102, in
ds.add_sink(JdbcSink.sink(
File "/home/mhtuan/work/flink-1.19.1/opt/python/pyflink.zip/pyflink/datastream/connectors/jdbc.py", line 60, in sink
File "/home/mhtuan/work/flink-1.19.1/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/mhtuan/work/flink-1.19.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/home/mhtuan/work/flink-1.19.1/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o174.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
< /code>
Кто -нибудь знает, как это исправить? Большое спасибо!

Подробнее здесь: https://stackoverflow.com/questions/789 ... flink-jdbc
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Java.lang.nosuchmethodexception в Python Flink jdbc
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Java.lang.nosuchmethodexception: org.apache.flink.connector.jdbc.internal.jdbcoutputformat.createrowjdbcstatementbuilder
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Где я могу найти банку Flink-sql-gateway-client-jdbc, чтобы соединить шлюз Flink SQL через DBEAVER?
    Anonymous » » в форуме JAVA
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Ошибка сборки собственного образа GrallVM «java.lang.NoSuchMethodException: sun.invoke.util.ValueConversions.booleanToİn
    Anonymous » » в форуме JAVA
    0 Ответы
    74 Просмотры
    Последнее сообщение Anonymous
  • Вызвано: java.lang.NoSuchMethodException: [класс android.app.Application] - viewmodel
    Anonymous » » в форуме Android
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»