У меня есть простое приложение Apache Flink (PyFlink), которое считывает данные из потока AWS Kinesis с помощью официального соединителя flink и Flink TableAPI https://nightlies.apache.org/flink/flin ... ster/docs/ соединители/поток данных/kinesis/
java.lang.RuntimeException: java.io.IOException: Failed to deserialize JSON
Возникает исключение. Как мне настроить приложение/коннектор так, чтобы оно игнорировало неверные записи в потоке Kinesis и продолжало работу?
Я пытался обработать это с помощью стандартного метода try/Exception, однако, поскольку это исключение, скорее всего, вызвано предварительно упакованный соединитель Flink/Kinesis .jar, это не имеет никакого эффекта.
У меня есть простое приложение Apache Flink (PyFlink), которое считывает данные из потока AWS Kinesis с помощью официального соединителя flink и Flink TableAPI https://nightlies.apache.org/flink/flink-docs-master/docs/ соединители/поток данных/kinesis/ [code]from pyflink.table import EnvironmentSettings, TableEnvironment import os import json
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda
is_local = ( True if os.environ.get("IS_LOCAL") else False ) # set this env var in your local environment
if is_local: # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;) APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
def get_application_properties(): if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH): with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file: contents = file.read() properties = json.loads(contents) return properties else: print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_source_table(input_table_name, input_stream, input_region, stream_initpos) )
# 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_sink_table(output_table_name, output_stream, output_region, stream_initpos) )
# 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
if is_local: table_result.wait() else: # get job status through TableResult print(table_result.get_job_client().get_job_status())
if __name__ == "__main__": main() [/code] Проблема заключается в том, что иногда в потоке Kinesis присутствует неверный формат JSON, что приводит к [code]java.lang.RuntimeException: java.io.IOException: Failed to deserialize JSON[/code] Возникает исключение. Как мне настроить приложение/коннектор так, чтобы оно игнорировало неверные записи в потоке Kinesis и продолжало работу? Я пытался обработать это с помощью стандартного метода try/Exception, однако, поскольку это исключение, скорее всего, вызвано предварительно упакованный соединитель Flink/Kinesis .jar, это не имеет никакого эффекта.
У меня есть приложение Java 21, которое использует зависимости Apache Flink (версия 1.20.0) для фильтрации потока Kafka.
Когда я пытаюсь выполнить свою программу, я получаю следующую ошибку: п>
INFO...
Я хочу осуществлять потоковую передачу с веб-камеры на Kinesis Video Streams.
Я читал документацию и следовал инструкциям AWS.
После запуска DemoAppMain с помощью команды:
Я использую для подключения к AWS Kinesis. Время от времени я вижу, что в журналах есть ошибка 400, зарегистрированную в линии, которую я делаю Client.send () Post запрос. Ни одно из последствий не сработало, чтобы показать мне причину этой ошибки...
Надеюсь, у вас у всех хороший день. Для меня было бы очень важно, если бы вы могли мне помочь.
Я пытаюсь войти в систему и получаю ошибку в строке ReadedStatement, в которой говорится: «Невозможно найти символ символ: метод...
Я пытаюсь войти в систему и получаю сообщение об ошибке в строке ReadedStatement, в которой говорится: «Невозможно найти символ символа: метод подготовленныйстатемент(строка) местоположение: переменная тип cxof Переменная соединения ps никогда не...