Я хочу создать сообщение в формате avro из моего приложения в тему под названием «журналы». Я настроил реестр схемы в ap-south-1 (регион Мумбаи). Я передаю ap-south-1 в качестве значения в конфигурации производителя, но получаю сообщение «Регион не определен».
Ошибка:
2024-12-16T17:57:50.361+05:30 ERROR 25580 --- [consumer] [ntainer#1-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=3000, currentAttempts=3, maxAttempts=2} exhausted for Normal-0@4
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO) throws java.util.concurrent.ExecutionException,java.lang.InterruptedException' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.4.jar:3.2.4]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.6.jar:1.13.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.4.jar:3.2.4]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1570) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:473) ~[kafka-clients-3.7.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:294) ~[kafka-clients-3.7.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:944) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:826) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:768) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:762) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:964) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:570) ~[spring-kafka-3.2.4.jar:3.2.4]
at com.bestercapialmedia.notification.consumer.strategy.logger.DatabaseLogger.saveLog(DatabaseLogger.java:68) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.strategy.logger.AbstractLogger.log(AbstractLogger.java:10) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.decorator.NotificationDecorator.send(NotificationDecorator.java:29) ~[classes/:na]
at com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(NotificationConsumer.java:36) ~[classes/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.14.jar:6.1.14]
at org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.14.jar:6.1.14]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
... 12 common frames omitted
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Region is not defined in the properties
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetAWSRegion(GlueSchemaRegistryConfiguration.java:150) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:87) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74) ~[schema-registry-common-1.1.15.jar:na]
at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.configure(AWSKafkaAvroSerializer.java:90) ~[schema-registry-serde-1.1.18.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:397) ~[kafka-clients-3.7.1.jar:na]
... 37 common frames omitted
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${aws.schema.registry.region}")
private String schemaRegistryRegion;
@Value("${aws.schema.registry.name}")
private String registryName;
@Value("${aws.schema.name}")
private String schemaName;
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
if (!Objects.equals(bootstrapServers, "localhost:9092")) {
config.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
config.put(SaslConfigs.SASL_JAAS_CONFIG,
"software.amazon.msk.auth.iam.IAMLoginModule required;");
config.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}
config.put(JsonDeserializer.TYPE_MAPPINGS, "notification:com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
// config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "true");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "notification-consumer-" + UUID.randomUUID().toString());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 seconds
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000); // 15 seconds
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 40000);
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
// config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,KafkaNotificationDTO.class.getName());
// Set the actual deserializers
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class.getName());
// Configure the JsonDeserializer
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// config.put(JsonDeserializer.TYPE_MAPPINGS, "notification:com.bestercapialmedia.notification.consumer.dto.KafkaNotificationDTO");
// config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaNotificationDTO.class.getName());
// Producer config
config.put(ProducerConfig.RETRIES_CONFIG, 3); // retry atmost 3 times
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 2000); // retry at 2 seconds intervals
config.put(ProducerConfig.ACKS_CONFIG, "all"); // wait for ack from all brokers
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
config.putAll(getAwsSchemaRegistryConfig());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaConsumerFactory(config);
}
private Map getAwsSchemaRegistryConfig() {
Map awsConfig = new HashMap();
awsConfig.put(AWSSchemaRegistryConstants.AWS_REGION, schemaRegistryRegion) ;
awsConfig.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
awsConfig.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
awsConfig.put(AWSSchemaRegistryConstants.SCHEMA_NAME, schemaName);
return awsConfig;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// Retry 2 times with 1 second interval
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(3000L, 2L));
factory.setCommonErrorHandler(errorHandler);
return factory;
}
Я не понимаю, почему он не устанавливает регион, хотя переменная SchemaRegistryRegion имеет правильное значение как ap-south-1 во время выполнения.
Примечание. Мое приложение одновременно является потребителем и производителем.
Я проверил трижды, и мой реестр схем находится в регионе Мумбаи, но все же он не могу к нему подключиться. Может ли кто-нибудь сказать мне, что я здесь делаю неправильно? Я искал об этом в Интернете, но так и не смог понять.
Я хочу создать сообщение в формате avro из моего приложения в тему под названием «журналы». Я настроил реестр схемы в ap-south-1 (регион Мумбаи). Я передаю ap-south-1 в качестве значения в конфигурации производителя, но получаю сообщение «Регион не определен». Ошибка: [code]2024-12-16T17:57:50.361+05:30 ERROR 25580 --- [consumer] [ntainer#1-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=3000, currentAttempts=3, maxAttempts=2} exhausted for Normal-0@4
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(com.bestercapitalmedia.notification.notification_service.dto.KafkaNotificationDTO) throws java.util.concurrent.ExecutionException,java.lang.InterruptedException' threw exception at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.4.jar:3.2.4] at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.6.jar:1.13.6] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.4.jar:3.2.4] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1570) ~[na:na] Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4] Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:473) ~[kafka-clients-3.7.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:294) ~[kafka-clients-3.7.1.jar:na] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:944) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:826) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:793) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:768) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:762) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:964) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:793) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:570) ~[spring-kafka-3.2.4.jar:3.2.4] at com.bestercapialmedia.notification.consumer.strategy.logger.DatabaseLogger.saveLog(DatabaseLogger.java:68) ~[classes/:na] at com.bestercapialmedia.notification.consumer.strategy.logger.AbstractLogger.log(AbstractLogger.java:10) ~[classes/:na] at com.bestercapialmedia.notification.consumer.decorator.NotificationDecorator.send(NotificationDecorator.java:29) ~[classes/:na] at com.bestercapialmedia.notification.consumer.consumer.NotificationConsumer.sendNotification(NotificationConsumer.java:36) ~[classes/:na] at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.1.14.jar:6.1.14] at org.springframework.kafka.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.1.14.jar:6.1.14] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4] ... 12 common frames omitted Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Region is not defined in the properties at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetAWSRegion(GlueSchemaRegistryConfiguration.java:150) ~[schema-registry-common-1.1.15.jar:na] at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:87) ~[schema-registry-common-1.1.15.jar:na] at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82) ~[schema-registry-common-1.1.15.jar:na] at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74) ~[schema-registry-common-1.1.15.jar:na] at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.configure(AWSKafkaAvroSerializer.java:90) ~[schema-registry-serde-1.1.18.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:397) ~[kafka-clients-3.7.1.jar:na] ... 37 common frames omitted [/code] Ниже приведен мой класс конфигурации: [code]@Configuration public class KafkaConfig {
// Set the actual deserializers config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class.getName());
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory());
// Retry 2 times with 1 second interval DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(3000L, 2L)); factory.setCommonErrorHandler(errorHandler);
return factory; }
[/code] Я не понимаю, почему он не устанавливает регион, хотя переменная SchemaRegistryRegion имеет правильное значение как ap-south-1 во время выполнения. Примечание. Мое приложение одновременно является потребителем и производителем. Я проверил трижды, и мой реестр схем находится в регионе Мумбаи, но все же он не могу к нему подключиться. Может ли кто-нибудь сказать мне, что я здесь делаю неправильно? Я искал об этом в Интернете, но так и не смог понять.
Я пытаюсь добавить службу Kafka в свои тестовые примеры SpecFlow, где я создал тестовые контейнеры Kafka и SchemaRegistry. Теперь, когда я запускаю тесты в режиме отладки, мой код подключается к реестру схемы и службе Kafka, и мой тестовый пример...
Мне нужно написать интеграционный тест для совместной работы Kafka и Schema Registry из-за использования сериализации AVRO.
Мой Maven настроен следующим образом (Apple M2 Max)
% mvn -version
Apache Maven 3.9.9...
Недавно мы добавили автоматизацию Kafka в наш проект, и благодаря созданию новой учетной записи потребителя мы смогли проводить опросы по этой теме и получать сообщения в нашем локальном хранилище. Когда я попытался отправить код в конвейер gitlab,...
Используя AWS Firehose для загрузки данных в таблицу Iceberg, управляемую AWS Glue, я не могу вставить данные временных меток.
Firehose
Я пытаюсь вставить данные с помощью следующего скрипта:
json_data = json.dumps(
{
ADF_Record : {
foo : bar...