Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-KafkaJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-Kafka

Сообщение Anonymous »

Я использую Spring-boot-starter-parent версии 1.5.0.RELEASE, Spring-kafka версии 1.0.0.RELEASE и Spring-kafka- протестируйте версию 1.0.0.RELEASE в приложении, которое получает сообщения из кластера Kakfa 0.9. У меня есть модульный тест для моего потребителя, который использовал KafkaEmbedded, но он завершается неудачей, поскольку порт брокера выбирается случайным образом. Есть ли способ установить это свойство брокера без изменения версии? Или какие версии мне следует использовать, чтобы ничего не сломать?
Вот код KafkaListener и KafkaConsumerTest.
Listener.java
@Service
public class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}

public CountDownLatch getLatch() {
return latch;
}
}


KafkaConsumerTest.java (РЕДАКТИРОВАТЬ)
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

public KafkaTemplate template;

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private Listener listener;

@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}

@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}



Подробнее здесь: https://stackoverflow.com/questions/562 ... a-consumer
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»