Проблема Kafka: MessageConversionException: невозможно преобразовать [java.lang.String] в [my_custom_model] для GenericMJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Проблема Kafka: MessageConversionException: невозможно преобразовать [java.lang.String] в [my_custom_model] для GenericM

Сообщение Anonymous »

У меня есть серверы-производители и потребители Kafka, и когда я пытаюсь отправить сообщение, я получаю следующее исключение:

Код: Выделить всё

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mail.sender.service.senders.GmailConfirmationSenderService.confirmationMessageListener(com.mail.sender.dto.request.AccountRequest)]
Bean [com.mail.sender.service.senders.GmailConfirmationSenderService@24787445]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.mail.sender.dto.request.AccountRequest] for GenericMessage [payload={"email":"ckopo.6ygy@gmail.com","username":"asdasd-mjeesh","confirmationTokenDetails":{"token":"3fd3c1ee-20ec-420b-8ee9-11d22cd7598e","createdAt":[2016,1,25,21,34,55],"expiredAt":[2023,1,8,18,19,6,661473300]}}, headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4008ea0f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mail_confirmation_message, kafka_receivedTimestamp=1673193848466, __TypeId__=[B@68a9f1ab, kafka_groupId=account_confirmation_group_id}]
Конфигурация моего сервера-производителя

Код: Выделить всё

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServersUrl;

public Map producerConfig() {
Map props = new HashMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "accountRequest:com.confirmation_token.model.dto.request.outgoing.AccountRequest");
return props;
}

@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig());
}

@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
return new KafkaTemplate(producerFactory);
}
}
Как я отправляю сообщения

Код: Выделить всё

@Service
@Slf4j
@RequiredArgsConstructor
public class EmailConfirmationSenderServiceCommunication implements ConfirmationSender {
private final KafkaTemplate kafkaTemplate;

@Override
public void sendConfirmationToken(AccountRequest accountRequest) {
kafkaTemplate.send("mail_confirmation_message", accountRequest);
log.info("Confirmation token={} has been send", accountRequest);
}
}
Модели на стороне производителя

Код: Выделить всё

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
private String email;
private String username;
private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
private String token;
private LocalDateTime createdAt;
private LocalDateTime expiredAt;
}
Конфигурация моего потребительского сервера

Код: Выделить всё

@Configuration
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServerUrl;

public Map consumerConfig() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TYPE_MAPPINGS, "accountRequest:com.mail.sender.dto.request.AccountRequest");
return props;
}

@Bean
public ConsumerFactory  consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfig());
}

@Bean
public KafkaListenerContainerFactory listenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory listenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory();
listenerContainerFactory.setConsumerFactory(consumerFactory);
return listenerContainerFactory;
}
}
Конфигурация для темы

Код: Выделить всё

@Configuration
public class KafkaTopicConfig {

@Value("${kafka.topic.names.account.confirmation}")
private String accountMailConfirmationTopicName;

@Bean
public NewTopic accountConfirmationTopic() {
return TopicBuilder
.name(accountMailConfirmationTopicName)
.build();
}
}
Как я пытаюсь прослушивать сообщения

Код: Выделить всё

@Service
@Slf4j
@RequiredArgsConstructor
public class GmailConfirmationSenderService implements EmailSender {
private final String accountConfirmationTemplate;
private final ApplicationModelValidator applicationModelValidator;
private final JavaMailSender mailSender;

@Value("${confirmation.link.template}")
private String confirmationLink;

@KafkaListener(topics = "mail_confirmation_message", groupId = "account_confirmation_group_id")
public void confirmationMessageListener(AccountRequest accountRequest) {
String validationViolations = applicationModelValidator.validate(accountRequest);
if (!validationViolations.isBlank()) {
log.error(validationViolations);
throw new ModelValidationException(validationViolations);
}

String userConfirmationLink = String.format(
accountConfirmationTemplate,
accountRequest.getUsername(),
confirmationLink + accountRequest.getConfirmationTokenDetails().getToken());
this.send(accountRequest, userConfirmationLink);
}

...
}

Модели на стороне потребителя

Код: Выделить всё

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountRequest {
@Email
private String email;

@Length(min = 8, max = 40)
private String username;

@NotNull
@Valid
private ConfirmationTokenDetailsRequest confirmationTokenDetails;
}

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ConfirmationTokenDetailsRequest {
@NotBlank
private String token;

@PastOrPresent
private LocalDateTime createdAt;

@Future
private LocalDateTime expiredAt;
}
*В потребительских моделях есть аннотации для проверки, но они не влияют на процесс

Подробнее здесь: https://stackoverflow.com/questions/750 ... ang-string
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»