Как поместить поток таблицы в Postgres, когда он группируется в Flink?Apache

Ответить
Anonymous
 Как поместить поток таблицы в Postgres, когда он группируется в Flink?

Сообщение Anonymous »

Я использую Apache Flink 1.19 с Java 17.
Когда мы выполняем группировку в потоке данных, он постоянно изменяется и обновляется. Поэтому, когда я хочу передать результат в Postgres, я сначала пытаюсь преобразовать его в DataStream, как обычно, вот так:

Код: Выделить всё

DataStream finalDataStream = TableEnv.toDataStream(finalTable);
И обычно после этого я использую функцию карты, чтобы преобразовать ее в кортеж моих типов данных. Но проблема возникает именно здесь, в показанном коде. Мне это говорит:

Код: Выделить всё

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$3*' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user_name, ip, city, log_time, end_time], select=[user_name, ip, city, log_time, end_time, SUM(upload) AS upload, SUM(download) AS download])
поэтому я не могу преобразовать свою таблицу в поток данных. Ничего страшного, я бы попробовал то, что не требует преобразования в поток данных:

Код: Выделить всё

//create temp view for final result
TableEnv.createTemporaryView("source", finalTable);
//registering the table in postgres
TableEnv.executeSql("""
CREATE TABLE sink_table {
columns ....
PRIMARY KEY (column ...) NOT ENFORCED
""" + ") WITH (" +
"  'connector' = 'jdbc'," +
"  'url' = '" + Entity.jdbcUrl + "'," +
"  'table-name' = 'sink_table'," +
"  'username' = '" + Entity.username + "'," +
"  'password' = '" + Entity.password + "'," +
"  'driver' = 'org.postgresql.Driver'" +
")"
);
// Insert into the sink table
TableEnv.executeSql("INSERT INTO sink_table " +
"SELECT columns FROM source"
);

Код: Выделить всё

but it tells me This:Caused by: org.postgresql.util.PSQLException: ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
и если вы об этом думаете:

Код: Выделить всё

TableEnv.executeSql("INSERT INTO sink_table (columns ...) " +
"SELECT columns ... " +
"FROM source " +
"ON CONFLICT (column1, column2) " +
"DO UPDATE SET " +
"    column3 = EXCLUDED.column3  "
);
ну, я пробовал это раньше, и Flink говорит следующее:

Код: Выделить всё

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ON' at line 4, column 1.
Я действительно запутался, могу ли я попробовать что-нибудь еще?

Подробнее здесь: https://stackoverflow.com/questions/786 ... y-in-flink
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Apache»