Kafka Consumer Interceptor не регистрируется в приложении весенней загрузкиJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Kafka Consumer Interceptor не регистрируется в приложении весенней загрузки

Сообщение Anonymous »

У меня есть следующий класс потребительского перехватчика Kafka (ApiGatewayKafkaConsumerInterceptor):
package com.aman.api.gateway.kafka.interceptor;

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class ApiGatewayKafkaConsumerInterceptor implements ConsumerInterceptor {

public ApiGatewayKafkaConsumerInterceptor(){
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
System.out.println("$$$ $$$");
System.out.println("$$$ GATEWAY INTERCEPTOR REGISTERED $$$");
System.out.println("$$$ $$$");
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
}

@Override
public ConsumerRecords onConsume(ConsumerRecords consumerRecords) {
return consumerRecords;
}

@Override
public void onCommit(Map offsets) {
}

@Override
public void close() {
}

@Override
public void configure(Map configs) {
}

}

и следующий класс KafkaConsumerConfig:
package com.aman.api.gateway.kafka.config;

import com.aman.api.gateway.controller.switcher.TransactionController;
import com.aman.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor;
import com.aman.api.gateway.kafka.model.ConsumerConfigType;
import com.aman.api.gateway.kafka.model.KafkaConsumerProperties;
import com.aman.common.kafka.interceptor.KafkaConsumerInterceptor;
import com.aman.common.service.ServiceLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

private final ServiceLogger logger = new ServiceLogger(KafkaConsumerConfig.class);
@Value("${kafka.concurrency}")
int concurrency;
@Autowired
KafkaConsumerProperties kafkaProperties;

// @Value("${spring.kafka.consumer.session.timeout.ms}")
// private String sessionTimeout;
@Bean
public Map consumerConfigs() {
return getProperties(ConsumerConfigType.DEFAULT);
}

private Map getProperties(ConsumerConfigType type) {
System.out.print(kafkaProperties.getMaxPartitionFetchBytes());
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStarpServers());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isEnableAutoCommit());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaProperties.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId(type)); //how or which group Id??
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getMaxPollRecords(type));
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getRequestTimeoutMS());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
System.out.println("Registering ApiGatewayKafkaConsumerInterceptor....");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ApiGatewayKafkaConsumerInterceptor.class.getName());
return props;
}

@Bean(name = "consumerConfig")
@Scope(value = "prototype")
@Lazy(value = true)
public Map consumerConfigs(ConsumerConfigType configType) {
return getProperties(configType);
}

@ConditionalOnMissingBean(name = "requestKafkaListenerContainerFactory")
public ConsumerFactory requestConsumerFactory() {
Map props = consumerConfigs(ConsumerConfigType.REQUEST);
System.out.println("Creating a consumer factory with props = " + props);
return new DefaultKafkaConsumerFactory(props, new StringDeserializer(),
new JsonDeserializer());
}

@Bean
public ConcurrentKafkaListenerContainerFactory requestKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(requestConsumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
return factory;
}

@Bean
public DefaultKafkaHeaderMapper headerMapper() {
return new DefaultKafkaHeaderMapper();
}
}

Хотя свойство interceptor.classes успешно передается фабрике потребителей, оно по-прежнему отображается пустым, когда Kafka отображает значения ConsumerConfig, а перехватчик не зарегистрирован. Вот результат (соответствующая часть), когда я запускаю приложение весенней загрузки:
Registering ApiGatewayKafkaConsumerInterceptor....
Creating a consumer factory with props = {key.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, spring.json.trusted.packages=*, value.deserializer=class org.springframework.kafka.support.serializer.JsonDeserializer, enable.auto.commit=true, max.poll.records=10, request.timeout.ms=60000, group.id=gateway-group, max.partition.fetch.bytes=100000000, bootstrap.servers=[127.0.0.1:9092], interceptor.classes=com.aman.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor, auto.commit.interval.ms=1000, session.timeout.ms=15000}
2024-11-10 09:57:55.359 INFO 16024 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-gateway-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = gateway-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 10
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer


Подробнее здесь: https://stackoverflow.com/questions/791 ... g-boot-app
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»