Когда мы выполняем группировку в потоке данных, он постоянно изменяется и обновляется. Поэтому, когда я хочу передать результат в Postgres, я сначала пытаюсь преобразовать его в DataStream, как обычно, вот так:
Код: Выделить всё
DataStream finalDataStream = TableEnv.toDataStream(finalTable);
Код: Выделить всё
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$3*' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user_name, ip, city, log_time, end_time], select=[user_name, ip, city, log_time, end_time, SUM(upload) AS upload, SUM(download) AS download])Код: Выделить всё
//create temp view for final result
TableEnv.createTemporaryView("source", finalTable);
//registering the table in postgres
TableEnv.executeSql("""
CREATE TABLE sink_table {
columns ....
PRIMARY KEY (column ...) NOT ENFORCED
""" + ") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = '" + Entity.jdbcUrl + "'," +
" 'table-name' = 'sink_table'," +
" 'username' = '" + Entity.username + "'," +
" 'password' = '" + Entity.password + "'," +
" 'driver' = 'org.postgresql.Driver'" +
")"
);
// Insert into the sink table
TableEnv.executeSql("INSERT INTO sink_table " +
"SELECT columns FROM source"
);
Код: Выделить всё
but it tells me This:Caused by: org.postgresql.util.PSQLException: ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
Код: Выделить всё
TableEnv.executeSql("INSERT INTO sink_table (columns ...) " +
"SELECT columns ... " +
"FROM source " +
"ON CONFLICT (column1, column2) " +
"DO UPDATE SET " +
" column3 = EXCLUDED.column3 "
);
Код: Выделить всё
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ON' at line 4, column 1.
Подробнее здесь: https://stackoverflow.com/questions/786 ... y-in-flink
Мобильная версия