Код: Выделить всё
@Named
@Singleton
@Immutable
@ThreadSafe
public class UserEventListener extends MessageTypeInstrumentedMessageListener {
private static final Logger LOG = LoggerFactory.getLogger(UserEventListener.class);
private final Map handler = userEventHandlersMap.get(event.getType());
handler.handle(event);
} catch (Exception e) {
LOG.error("Unable to process user event: {}", messageStr, e);
}
}
private KafkaEventWrapper decode(final String message) {
return JsonTransformer.UGLY.fromJson(message, KafkaEventWrapper.class);
}
}
Код: Выделить всё
Код: Выделить всё
Код: Выделить всё
ERROR io.confluent.serializers.ProtoSerde - Tried to deserialize message with unknown magic byte 123
ERROR org.apache.kafka.connect.util.KafkaBasedLog - Error polling: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition USER_EVENT-0 at offset 5. If needed, please seek past the record to continue consumption.
Что я пробовал:
- Настроил потребитель может использовать StringDeserializer как для ключа, так и для значения.
Добавлен ExceptionListener для перехвата исключений во время приема сообщения:
Код: Выделить всё
@Named
public class UserEventExceptionListener implements ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(UserEventExceptionListener.class);
@Override
public void onException(JMSException exception) {
LOG.error("JMS Exception occurred. Continuing processing...", exception);
}
}
Код: Выделить всё
- Реализован собственный SafeStringDeserializer:
Код: Выделить всё
public class SafeStringDeserializer implements Deserializer {
private static final Logger LOG = LoggerFactory.getLogger(SafeStringDeserializer.class);
@Override
public String deserialize(String topic, byte[] data) {
try {
return data == null ? null : new String(data, StandardCharsets.UTF_8);
} catch (Exception e) {
LOG.error("Failed to deserialize message from topic {}: {}", topic, e.getMessage());
return null;
}
}
}
Код: Выделить всё
- Почему я все еще получаю ошибки десериализации с магическим байтом 123 даже после настройка десериализаторов для обработки JSON?
- Как правильно настроить потребителя JMS Kafka для обработки сообщений в формате JSON без ошибок десериализации?
- Есть ли способ обойти или обработать эти сообщения о ядовитых таблетках, чтобы потребитель мог продолжить обработку других сообщений?
Подробнее здесь: https://stackoverflow.com/questions/792 ... c-byte-123