Я сделал класс отправителя, используя компонент Kafkatemplate для отправки полезной нагрузки в тему
с некоторой конфигурацией в классе Senderconfiguration. < /p>
sender < /strong> class < /p>
< /strong> < /p>
@Component
public class Sender {
private static final Logger LOGGER = (Logger) LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, "1", payload);
}
}
< /code>
, senderconfiguration < /strong> class < /p>
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map producerConfigs() {
Map props = new HashMap();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
< /code>
Проблема заключается в том, чтобы отправлять не в создании < /p>
Здесь.kafka:
bootstrap-servers: localhost:9092
topic:
helloworld: helloworld.t
< /code>
и просто контроллер, содержащий < /p>
@RestController
public class Controller {
protected final static String HELLOWORLD_TOPIC = "helloworld.t";
@Autowired
private Sender sender;
@RequestMapping("/send")
public String SendMessage() {
sender.send(HELLOWORLD_TOPIC, "message");
return "success";
}
}
< /code>
А исключение < /p>
2017-12-20 09:58:04.645 INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1
2017-12-20 09:58:04.645 INFO 10816 --- [nio-7060-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247
2017-12-20 09:59:04.654 ERROR 10816 --- [nio-7060-exec-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='1' and payload='message' to topic helloworld.t:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Подробнее здесь: https://stackoverflow.com/questions/479 ... g-to-topic
TimeOutException, брошенное при отправке на тему ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Плохое файл конфигурации службы или исключение, брошенное при построении объекта процессора
Anonymous » » в форуме JAVA - 0 Ответы
- 15 Просмотры
-
Последнее сообщение Anonymous
-