Код: Выделить всё
Pattern
.begin(PATTERN_NAME)
.times(threshold)
.within(Time.milliseconds(timeWindowMilliseconds));
< /code>
Эта работа успешно работает в течение долгого времени, но после недавнего развертывания, казалось бы, случайных случаев NullpointerException в трассировке стека CEP: < /p>
Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.NullPointerException
at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.materializeMatch(SharedBufferAccessor.java:216)
at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:423)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:251)
at org.apache.flink.cep.operator.CepOperator.processEvent(CepOperator.java:404)
at org.apache.flink.cep.operator.CepOperator.lambda$onEventTime$0(CepOperator.java:315)
< /code>
Исключение выкладывается в следующую строку, которая обращается к общему буферу, который использует CEP: < /p>
V event = sharedBuffer.getEvent(eventId).getElement();
< /code>
Некоторый дополнительный контекст: < /p>
[list]
[*] Flink версия 1.16.0 < /li>
Образец запускается на ключевом потоке данных, полученном из Confluent Kafka, сериализованной с AVRO с использованием обработки времени событий
[*] контрольно -пропускной пункт, поддерживаемая RockSDB, первоначальный государственный поиск, по -видимому, успешно
[*] Изменения с момента последнего развертывания
Параллелизм был увеличен с 1 до 2 < /li>
Схема конфликта AVRO для CourtomeWent Изменено, чтобы добавить новое необязательное поле со значением по умолчанию null
timeWindowMilliseconds[/list]
< /ul>
< /li>
< /ul>
Поскольку ошибка возникает, казалось бы, случайным образом, я предполагаю, что если появится событие, это ключ с идентификатором Это уже содержит старую схему в состоянии, эта ошибка возникает. С единственным дополнением к схеме является дополнительное поле, эти схемы должны быть обратно совместимы с моим пониманием.
Любое понимание того, что может происходить здесь?>
Подробнее здесь: https://stackoverflow.com/questions/794 ... tern-stack