Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-KafkaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как установить порт в KafkaEmbedded при модульном тестировании потребителя Spring-Kafka

Сообщение Anonymous »

Я использую Spring-boot-starter-parent версии 1.5.0.RELEASE, Spring-kafka версии 1.0.0.RELEASE и Spring-kafka-test версии 1.0.0.RELEASE в приложении, которое использует сообщения из кластера Kakfa 0.9. У меня есть модульный тест для моего потребителя, который использовал KafkaEmbedded, но он завершается неудачей, поскольку порт брокера выбирается случайным образом. Есть ли способ установить это свойство брокера без изменения версии? Или какие версии мне следует использовать, чтобы ничего не сломать?
Вот код KafkaListener и KafkaConsumerTest.
Listener.java
@Service
public class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);

@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}

public CountDownLatch getLatch() {
return latch;
}
}


KafkaConsumerTest.java (РЕДАКТИРОВАТЬ)
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

public KafkaTemplate template;

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private Listener listener;

@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory producerFactory = new DefaultKafkaProducerFactory(senderProps);
template = new KafkaTemplate(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}

@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}



Подробнее здесь: https://stackoverflow.com/questions/562 ... a-consumer
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»