Я использую Spring-Boot-Starter-parent версии 1.5.0.release , Spring-Kafka версия 1.0.0.release и Spring-Kafka-test версия 1.0.0. У меня есть модульный тест для моего потребителя, который использовал kafkaembedded , но он не работает, так как порт брокера поднимается случайным образом. Есть ли способ, которым я могу установить это свойство брокера без изменения версий? Или какие версии я должен использовать, чтобы ничего не сломать?@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
kafkaconsumertest.java ( edit )
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}
Подробнее здесь: https://stackoverflow.com/questions/562 ... a-consumer
Как установить порт в kafkaembedded, когда модульный тестирование Spring-Kafka потребитель ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как установить порт в kafkaembedded, когда модульный тестирование Spring-Kafka потребитель
Anonymous » » в форуме JAVA - 0 Ответы
- 12 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-Kafka
Anonymous » » в форуме JAVA - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Контейнеризованный потребитель Kafka не может подключиться к контейнеру Kafka
Anonymous » » в форуме JAVA - 0 Ответы
- 42 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Контейнеризованный потребитель Kafka не может подключиться к контейнеру Kafka
Anonymous » » в форуме JAVA - 0 Ответы
- 85 Просмотры
-
Последнее сообщение Anonymous
-