Неожиданное исключение в задании pyflink ⇐ Apache
-
Anonymous
Неожиданное исключение в задании pyflink
Ключевая часть моей программы pyflink заключается в следующем:
_sen_ml_parse = SenMLParse() источник = KafkaSource.builder() \ .set_bootstrap_servers('kafka:9092') \ .set_topics('ETL') \ .set_value_only_deserializer(SimpleStringSchema()) \ .set_starting_offsets(KafkaOffsetsInitializer.latest()) \ .строить() ds = env.from_source(source, WatermarkStrategy.no_watermarks(), «Источник Кафки») ds = ds.map(lambda i : (eval(i)['MSGID'], eval(i)['PAYLOAD'])) \ .map(лямбда я: _sen_ml_parse.senmlparse(i)) ds.print() SenMLParse — это модуль, определенный в другом файле. Отправка этого задания прошла успешно, но через несколько секунд задание не удалось выполнить, и возникло исключение "нет модуля с именем SenMLParse" , что меня очень смутило. Мне интересно, почему произошло это исключение. Я сам могу придумать два:
[*]Модуль не был загружен должным образом. Но как он смог пройти компиляцию (отправка прошла успешно)? [*]Объект, который я объявил в первой строке, не был постоянным, поэтому манипуляции с источник данных в последних нескольких строках не смог добраться до него?
Большое спасибо, если кто-нибудь объяснит.
Ключевая часть моей программы pyflink заключается в следующем:
_sen_ml_parse = SenMLParse() источник = KafkaSource.builder() \ .set_bootstrap_servers('kafka:9092') \ .set_topics('ETL') \ .set_value_only_deserializer(SimpleStringSchema()) \ .set_starting_offsets(KafkaOffsetsInitializer.latest()) \ .строить() ds = env.from_source(source, WatermarkStrategy.no_watermarks(), «Источник Кафки») ds = ds.map(lambda i : (eval(i)['MSGID'], eval(i)['PAYLOAD'])) \ .map(лямбда я: _sen_ml_parse.senmlparse(i)) ds.print() SenMLParse — это модуль, определенный в другом файле. Отправка этого задания прошла успешно, но через несколько секунд задание не удалось выполнить, и возникло исключение "нет модуля с именем SenMLParse" , что меня очень смутило. Мне интересно, почему произошло это исключение. Я сам могу придумать два:
[*]Модуль не был загружен должным образом. Но как он смог пройти компиляцию (отправка прошла успешно)? [*]Объект, который я объявил в первой строке, не был постоянным, поэтому манипуляции с источник данных в последних нескольких строках не смог добраться до него?
Большое спасибо, если кто-нибудь объяснит.
Мобильная версия