Ошибка десериализации в Kafka JMS Consumer: «Неизвестный магический байт 123»JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка десериализации в Kafka JMS Consumer: «Неизвестный магический байт 123»

Сообщение Anonymous »

Я работаю над Java-приложением, которое получает сообщения из темы Kafka с помощью прослушивателя JMS в среде Spring. Приложение должно обрабатывать пользовательские события из темы USER_EVENT. Вот как реализован мой UserEventListener:

Код: Выделить всё

@Named
@Singleton
@Immutable
@ThreadSafe
public class UserEventListener extends MessageTypeInstrumentedMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);

private final Map handler = userEventHandlersMap.get(event.getType());
handler.handle(event);
} catch (Exception e) {
LOG.error("Unable to process user event: {}", messageStr, e);
}
}

private KafkaEventWrapper decode(final String message) {
return JsonTransformer.UGLY.fromJson(message, KafkaEventWrapper.class);
}
}
Конфигурация worker-context.xml для прослушивателя следующая: Потребительские свойства Kafka устанавливаются следующим образом: Несмотря на эту настройку, в журналах возникают следующие ошибки, а onMessageInternal никогда не достигается:

Код: Выделить всё

ERROR io.confluent.serializers.ProtoSerde - Tried to deserialize message with unknown magic byte 123
ERROR org.apache.kafka.connect.util.KafkaBasedLog - Error polling: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition USER_EVENT-0 at offset 5.  If needed, please seek past the record to continue consumption.
Моя основная проблема заключается в том, что это сообщение (которое, скорее всего, является тестовым сообщением, которое в какой-то момент было опубликовано вручную), отравляет меня, и я не могу хотя бы подтвердить это сообщение. Я не могу найти способ перехватить исключение десериализации.
Что я пробовал:
  • Настроил потребитель может использовать StringDeserializer как для ключа, так и для значения.
    Добавлен ExceptionListener для перехвата исключений во время приема сообщения:

Код: Выделить всё

@Named
public class UserEventExceptionListener implements ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(UserEventExceptionListener.class);

@Override
public void onException(JMSException exception) {
LOG.error("JMS Exception occurred. Continuing processing...", exception);
}
}
И обновил конфигурацию:
  • Реализован собственный SafeStringDeserializer:

Код: Выделить всё

public class SafeStringDeserializer implements Deserializer {
private static final Logger LOG = LoggerFactory.getLogger(SafeStringDeserializer.class);

@Override
public String deserialize(String topic, byte[] data) {
try {
return data == null ? null : new String(data, StandardCharsets.UTF_8);
} catch (Exception e) {
LOG.error("Failed to deserialize message from topic {}: {}", topic, e.getMessage());
return null;
}
}
}
Обновлены потребительские свойства: Несмотря на эти усилия, проблема остается.
  • Почему я все еще получаю ошибки десериализации с магическим байтом 123 даже после настройка десериализаторов для обработки JSON?
  • Как правильно настроить потребителя JMS Kafka для обработки сообщений в формате JSON без ошибок десериализации?
  • Есть ли способ обойти или обработать эти сообщения о ядовитых таблетках, чтобы потребитель мог продолжить обработку других сообщений?


Подробнее здесь: https://stackoverflow.com/questions/792 ... c-byte-123
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»