Как использовать метод Spring Kafka Acknowledgement.acknowledge() для ручной фиксацииJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как использовать метод Spring Kafka Acknowledgement.acknowledge() для ручной фиксации

Сообщение Anonymous »

Я впервые использую Spring Kafka и не могу использовать метод Acknowledgement.acknowledge() для ручной фиксации в моем потребительском коде, как указано здесь https://docs.spring.io/spring-kafka/reference/html /_reference.html#committing-offsets. Мое приложение с весенней загрузкой. Если я не использую процесс фиксации вручную, мой код работает нормально. Но когда я использую
Acknowledgement.acknowledge() для ручной фиксации, он показывает ошибку, связанную с компонентом. Также, если я неправильно использую ручную фиксацию, предложите мне правильный способ сделать это.

Сообщение об ошибке:

***************************
APPLICATION FAILED TO START
***************************

Description:

Field ack in Receiver required a bean of type 'org.springframework.kafka.support.Acknowledgment' that could not be found.

Action:

Consider defining a bean of type 'org.springframework.kafka.support.Acknowledgment' in your configuration.


Я погуглил эту ошибку и обнаружил, что мне нужно добавить @Component, но он уже есть в моем потребительском коде.

Мой потребительский код выглядит так: Receiver.java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

@Autowired
public Acknowledgment ack;

private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "${kafka.topic.TestTopic}")
public void receive(ConsumerRecord consumerRecord){
System.out.println(consumerRecord.value());
latch.countDown();
ack.acknowledge();
}
}


Мой код производителя выглядит так: Sender.java

import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

@Autowired
private KafkaTemplate kafkaTemplate;

public void send(Map map){
kafkaTemplate.send("TestTopic", map);

}

}


РЕДАКТИРОВАТЬ 1:

Мой новый потребительский код выглядит так: Получатель. Java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "${kafka.topic.TestTopic}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void receive(ConsumerRecord consumerRecord, Acknowledgment ack){
System.out.println(consumerRecord.value());
latch.countDown();
ack.acknowledge();
}
}


Я также изменил свой класс конфигурации:

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
@EnableKafka
public class ReceiverConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;

@Bean
public Map consumerConfigs() throws SendGridException {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return props;

}

@Bean
public ConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerConfigs());
}

/*@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());

return factory;
}*/

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaManualAckListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public Receiver receiver() {
return new Receiver();
}
}


После добавленияContainerFactory = «kafkaManualAckListenerContainerFactory» в мой метод получения() я получаю следующую ошибку.

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'

Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.


Подробнее здесь: https://stackoverflow.com/questions/463 ... ual-commit
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»