from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink
from pyflink.datastream.stream_execution_environment import RuntimeExecutionMode
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
env.add_jars('file:///Users/joseph/Downloads/flink-connector-jdbc-core-4.0.0-2.0.jar',
'file:///Users/joseph/Downloads/flink-connector-jdbc-mysql-4.0.0-2.0.jar',
'file:///Users/joseph/Downloads/mysql-connector-j-8.4.0.jar')
type_info = Types.ROW([Types.STRING(), Types.INT()])
ds = env.from_collection([('GM', 300), ('Volvo', 400)], type_info=type_info)
insert_sql = 'insert into Car (brand, price) values (?, ?)'
jdbc_exe_option = JdbcExecutionOptions.builder() \
.with_batch_interval_ms(1000) \
.with_batch_size(200) \
.with_max_retries(5) \
.build()
jdbc_conn_option = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
.with_url('jdbc:mysql://localhost:3306/etlmysql') \
.with_driver_name('com.mysql.cj.jdbc.Driver') \
.with_user_name('root') \
.with_password('p@$$w0rd') \
.build()
sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)
ds.add_sink(sink)
env.execute()
env.close()
< /code>
Но сообщение об ошибке брошено из jdbcsink.sink Line. < /p>
File "c:\VSCode_Workspace\flink-python\com\aaa\flink\flink-jdbc-test.py", line 32, in
sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\datastream\connectors\jdbc.py", line 60, in sink
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\util\exceptions.py", line 162, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o71.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
< /code>
Для вашей информации я прикрепляю коды Java того же разъема MySQL MySQL. < /p>
JdbcExecutionOptions jdbcExeOption = JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build();
JdbcConnectionOptions jdbcConnOption = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/etlmysql")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("p@$$w0rd")
.build();
JdbcSink jdbcSink = new JdbcSinkBuilder()
.withQueryStatement(
"INSERT INTO car VALUES(?,?)",
(statement, car) -> {
statement.setString(1, car.getBrand());
statement.setInt(2, car.getPrice());
})
.withExecutionOptions(jdbcExeOption)
.buildAtLeastOnce(jdbcConnOption);
stream.sinkTo(jdbcSink);
< /code>
Как вы видите, класс разъема Java jdbcsink отличается от разъема Python jdbcsink. В кодах Java объект jdbcsink
генерируется из класса jdbcsinkbuilder , но в Python это не так. Я думаю, что эти ошибки связаны с несоответствием версии API. Есть идея решить эти ошибки?
env.execute() env.close() < /code> Но сообщение об ошибке брошено из jdbcsink.sink Line. < /p> File "c:\VSCode_Workspace\flink-python\com\aaa\flink\flink-jdbc-test.py", line 32, in sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\datastream\connectors\jdbc.py", line 60, in sink j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder', ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\util\exceptions.py", line 162, in deco return f(*a, **kw) ^^^^^^^^^^^ File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o71.getDeclaredMethod. : java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I) at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) < /code> Для вашей информации я прикрепляю коды Java того же разъема MySQL MySQL. < /p> JdbcExecutionOptions jdbcExeOption = JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build();
JdbcConnectionOptions jdbcConnOption = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/etlmysql") .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("root") .withPassword("p@$$w0rd") .build();
JdbcSink jdbcSink = new JdbcSinkBuilder() .withQueryStatement( "INSERT INTO car VALUES(?,?)", (statement, car) -> { statement.setString(1, car.getBrand()); statement.setInt(2, car.getPrice()); }) .withExecutionOptions(jdbcExeOption) .buildAtLeastOnce(jdbcConnOption);
stream.sinkTo(jdbcSink); < /code> Как вы видите, класс разъема Java jdbcsink отличается от разъема Python jdbcsink. В кодах Java объект jdbcsink [/code] генерируется из класса jdbcsinkbuilder , но в Python это не так. Я думаю, что эти ошибки связаны с несоответствием версии API. Есть идея решить эти ошибки?
Я пытаюсь прочитать данные из Kafka и погрузить данные в PostgreSQL с Flink StreamingAPI с использованием Python. Я использую Flink версию 1.19.1 и PostgreSQL 14.13. Вот мой код
from pyflink.datastream.connectors.kafka import...
Я пытаюсь прочитать данные из Kafka и погрузить данные в PostgreSQL с Flink StreamingAPI с использованием Python. Я использую Flink версию 1.19.1 и PostgreSQL 14.13. Вот мой код
from pyflink.datastream.connectors.kafka import...
Пытаемся создать собственный образ с помощью GraalVM и Spring Boot. Само приложение работает, когда я использую команду mvn Spring-boot:run, но когда я создаю собственный образ и пытаюсь его запустить, я получаю следующее исключение:...
Пытаемся создать собственный образ с помощью GraalVM и Spring Boot. Само приложение работает, когда я использую команду mvn Spring-boot:run, но когда я создаю собственный образ и пытаюсь его запустить, я получаю следующее исключение:...
Попытка создать собственное изображение, используя GraAlvm и Spring Boot. Само приложение работает, когда я использую команду MVN Spring-Boot: Run, но когда я строю свое собственное изображение и пытаюсь запустить его, я получаю это исключение:...