Я использую Spring-boot-starter-parent версии 1.5.0.RELEASE, Spring-kafka версии 1.0.0.RELEASE и Spring-kafka- протестируйте версию 1.0.0.RELEASE в приложении, которое получает сообщения из кластера Kakfa 0.9. У меня есть модульный тест для моего потребителя, который использовал KafkaEmbedded, но он завершается неудачей, поскольку порт брокера выбирается случайным образом. Есть ли способ установить это свойство брокера без изменения версии? Или какие версии мне следует использовать, чтобы ничего не сломать?
Вот код KafkaListener и KafkaConsumerTest.
Listener.java
@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
KafkaConsumerTest.java (РЕДАКТИРОВАТЬ)
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}
Подробнее здесь: https://stackoverflow.com/questions/562 ... a-consumer
Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-Kafka ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как установить порт в kafkaembedded, когда модульный тестирование Spring-Kafka потребитель
Anonymous » » в форуме JAVA - 0 Ответы
- 11 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как установить порт в kafkaembedded, когда модульный тестирование Spring-Kafka потребитель
Anonymous » » в форуме JAVA - 0 Ответы
- 2 Просмотры
-
Последнее сообщение Anonymous
-