Невозможно подключиться к реестру AWS Glue Schema из Spring BootJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно подключиться к реестру AWS Glue Schema из Spring Boot

Сообщение Anonymous »

Я хочу создать сообщение в формате avro из моего приложения в тему под названием «журналы». Я настроил реестр схемы в ap-south-1 (регион Мумбаи). Я передаю ap-south-1 в качестве значения в конфигурации производителя, но получаю сообщение «Регион не определен».
Ошибка:

Код: Выделить всё

2024-12-16T17:57:50.361+05:30 ERROR 25580 --- [consumer] [ntainer#1-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff FixedBackOff{interval=3000, currentAttempts=3, maxAttempts=2} exhausted for Normal-0@4

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO) throws java.util.concurrent.ExecutionException,java.lang.InterruptedException' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.4.jar:3.2.4]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.6.jar:1.13.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.4.jar:3.2.4]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1570) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
Caused by: org.apache.kafka.common.KafkaException:  Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:473) ~[kafka-clients-3.7.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:294) ~[kafka-clients-3.7.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:944) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:826) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:768) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:762) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:964) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:570) ~[spring-kafka-3.2.4.jar:3.2.4]
at com.bestercapialmedia.notification.consumer.strategy.logger.DatabaseLogger.saveLog(DatabaseLogger.java:68) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.strategy.logger.AbstractLogger.log(AbstractLogger.java:10) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.decorator.NotificationDecorator.send(NotificationDecorator.java:29) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(NotificationConsumer.java:36) ~[classes/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.14.jar:6.1.14]
at org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.14.jar:6.1.14]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
...  12 common frames omitted
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Region is not defined in the properties
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetAWSRegion(GlueSchemaRegistryConfiguration.java:150) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:87) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.configure(AWSKafkaAvroSerializer.java:90) ~[schema-registry-serde-1.1.18.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:397) ~[kafka-clients-3.7.1.jar:na]
...  37 common frames omitted
Ниже приведен мой класс конфигурации:

Код: Выделить всё

@Configuration
public class KafkaConfig {

@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${aws.schema.registry.region}")
private String schemaRegistryRegion;

@Value("${aws.schema.registry.name}")
private String registryName;

@Value("${aws.schema.name}")
private String schemaName;

@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

if (!Objects.equals(bootstrapServers, "localhost:9092")) {
config.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
config.put(SaslConfigs.SASL_JAAS_CONFIG,
"software.amazon.msk.auth.iam.IAMLoginModule required;");
config.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}

config.put(JsonDeserializer.TYPE_MAPPINGS, "notification:com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
//        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "true");

config.put(ConsumerConfig.CLIENT_ID_CONFIG, "notification-consumer-" + UUID.randomUUID().toString());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);  // 45 seconds
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);  // 15 seconds
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 40000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);

//        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,KafkaNotificationDTO.class.getName());

// Set the actual deserializers
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class.getName());

// Configure the JsonDeserializer
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
//        config.put(JsonDeserializer.TYPE_MAPPINGS, "notification:com.bestercapialmedia.notification.consumer.dto.KafkaNotificationDTO");
//        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaNotificationDTO.class.getName());

// Producer config
config.put(ProducerConfig.RETRIES_CONFIG, 3); // retry atmost 3 times
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 2000); // retry at 2 seconds intervals
config.put(ProducerConfig.ACKS_CONFIG, "all"); // wait for ack from all brokers
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
config.putAll(getAwsSchemaRegistryConfig());

config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaConsumerFactory(config);
}
private Map getAwsSchemaRegistryConfig() {
Map  awsConfig = new HashMap();
awsConfig.put(AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion)  ;
awsConfig.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
awsConfig.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
awsConfig.put(AWSSchemaRegistryConstants.SCHEMA_NAME, schemaName);
return awsConfig;
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());

// Retry 2 times with 1 second interval
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(3000L, 2L));
factory.setCommonErrorHandler(errorHandler);

return factory;
}

Я не понимаю, почему он не устанавливает регион, хотя переменная SchemaRegistryRegion имеет правильное значение как ap-south-1 во время выполнения.
Примечание. Мое приложение одновременно является потребителем и производителем.
Я проверил трижды, и мой реестр схем находится в регионе Мумбаи, но все же он не могу к нему подключиться. Может ли кто-нибудь сказать мне, что я здесь делаю неправильно? Я искал об этом в Интернете, но так и не смог понять.

Подробнее здесь: https://stackoverflow.com/questions/792 ... pring-boot
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»