У меня есть простое приложение Pyflink , где мне нужен основной ключ для соединения временной таблицы. Поскольку мой источник использует Avro Confluent -Format, и у него есть проблемы с первичными ключами, я использую Transformation: SQL API -> Stream Objects -> SQL API. И здесь я получаю проблему, объясняющую таблицу, атрибут времени имеет атрибут времени.sensor_readings_ddl = f"""
CREATE TABLE sensor_readings (
kafka_key_id VARCHAR not null,
...
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
)
class TsExtractor(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp):
return element.ts # Extracts timestamp from the `ts` field of the event
sensors_reading_stream = (
tenv.to_data_stream(sensor_readings_tab)
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10)) # Define watermark strategy
.with_timestamp_assigner(TsExtractor()) # Use custom timestamp assigner
)
)
sensors_reading_schema = (Schema.new_builder().
column("kafka_key_id", DataTypes.STRING().not_null()).
...
column("ts", DataTypes.TIMESTAMP(3)).
primary_key("kafka_key_id").
watermark("ts", "ts - INTERVAL '5' SECOND").
build())
sensor_readings_view = tenv.from_data_stream(sensors_reading_stream, sensors_reading_schema)
sensor_readings_view_tab = tenv.create_temporary_view( "sensor_readings_view", sensors_reading_stream)
tumbling_w_sql = """
SELECT
sr.device_id,
das.metric_1,
das.metric_2,
TUMBLE_START(sr.ts, INTERVAL '30' SECONDS) AS window_start,
TUMBLE_END(sr.ts, INTERVAL '30' SECONDS) AS window_end,
SUM(sr.ampere_hour) AS charge_consumed
FROM sensor_readings_view FOR SYSTEM_TIME AS OF sr.ts AS sr
JOIN device_account_stats_view AS das ON sr.device_id = das.device_id
GROUP BY
TUMBLE(sr.ts, INTERVAL '30' SECONDS),
sr.device_id,
das.metric_1,
das.metric_2
"""
< /code>
Ошибка: < /p>
pyflink.util.exceptions.tableException:> org.apache.flink.table. API.TableException: Агрегат окна может быть определен только в течение столбца атрибута времени, но TimeStamp (3)> встречается. п>
Подробнее здесь: https://stackoverflow.com/questions/794 ... ream-and-s
Имея проблему преобразования атрибута времени при трансормировании между потоком и SQL API ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Имея проблему преобразования атрибута времени при трансормировании между потоком и SQL API
Anonymous » » в форуме Python - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как взаимодействовать между традиционным потоком и потоком asyncio в Python?
Anonymous » » в форуме Python - 0 Ответы
- 44 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как взаимодействовать между традиционным потоком и потоком asyncio в Python?
Anonymous » » в форуме Python - 0 Ответы
- 33 Просмотры
-
Последнее сообщение Anonymous
-