Я пытаюсь прочитать данные из Kafka и погрузить данные в PostgreSQL с Flink StreamingAPI с использованием Python. Я использую Flink версию 1.19.1 и PostgreSQL 14.13. Вот мой код < /p>
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaSource
from pyflink.common.typeinfo import Types
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext
from pyflink.common import WatermarkStrategy
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.state import ValueStateDescriptor
class CountWindowAverage(FlatMapFunction):
def __init__(self):
self.sum = None
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor(
"average",
Types.TUPLE([Types.INT(), Types.INT()])
)
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
current_sum = (current_sum[0] + 1, current_sum[1] + value[3])
self.sum.update(current_sum)
if current_sum[0] >= 2:
self.sum.clear()
yield (value[0], current_sum[0], current_sum[1])
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_python_executable("/home/mhtuan/anaconda3/envs/flink-env/bin/python")
env.add_jars("file:///home/mhtuan/work/flink_kafka/flink_sql_kafka.jar", "file:///home/mhtuan/work/flink_kafka/flink-connector-jdbc-3.2.0-1.19.jar", "file:///home/mhtuan/work/flink_kafka/postgresql-42.7.3.jar")
deserialization_schema = JsonRowDeserializationSchema.builder().type_info(
type_info=Types.ROW_NAMED(
["id", "hour_key", "name", "age"], [Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])).build()
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9091,localhost:9092,localhost:9093") \
.set_topics("test-flink-3") \
.set_group_id("flink-1") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(deserialization_schema) \
.build()
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds = ds.key_by(lambda record: record["id"], key_type=Types.INT()).flat_map(CountWindowAverage())
ds.add_sink(JdbcSink.sink(
"insert into flink.flatmap_age (id, count, sum) values (?, ?, ?)",
Types.ROW([Types.INT(), Types.INT(), Types.INT()]),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url('jdbc:postgresql://localhost:5432/postgres')
.with_driver_name('org.postgresql.Driver')
.with_user_name('postgres')
.with_password('postgres')
.build(),
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
))
env.execute("kafka_sink_example")
< /code>
Все работает нормально
При запуске кода я получаю следующую ошибку: < /p>
Traceback (most recent call last):
File "/home/mhtuan/work/flink_kafka/./src/main.py", line 102, in
ds.add_sink(JdbcSink.sink(
File "/home/mhtuan/work/flink-1.19.1/opt/python/pyflink.zip/pyflink/datastream/connectors/jdbc.py", line 60, in sink
File "/home/mhtuan/work/flink-1.19.1/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/mhtuan/work/flink-1.19.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/home/mhtuan/work/flink-1.19.1/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o174.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
< /code>
Кто -нибудь знает, как это исправить? Большое спасибо!
Подробнее здесь: https://stackoverflow.com/questions/789 ... flink-jdbc
Java.lang.nosuchmethodexception в Python Flink jdbc ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Вызвано: java.lang.NoSuchMethodException:
[класс android.app.Application] - viewmodel
Anonymous » » в форуме Android - 0 Ответы
- 23 Просмотры
-
Последнее сообщение Anonymous
-