Послушайте мою конфигурацию:
Код: Выделить всё
spring:
cloud:
function:
definition: kafkaConsumer;kafkaProducer
stream:
kafka:
binder:
replicationFactor: 1
auto-create-topics: true
brokers: 10.0.100.10:9092
bindings:
binding-import-device:
destination: import-device-topic
content-type: text/plain
group: device
Код: Выделить всё
@Component
public class KafkaProducer implements Supplier {
@Override
public String get() {
return "kakfa_producer";
}
}
@Component
public class KafkaConsumer implements Consumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private Map emitters = new HashMap();
public SseEmitter register(String key) {
LOG.debug("Registering sse client for {}", key);
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> emitters.remove(key));
emitters.put(key, emitter);
return emitter;
}
public void unregister(String key) {
LOG.debug("Unregistering sse emitter for: {}", key);
Optional.ofNullable(emitters.get(key)).ifPresent(SseEmitter::complete);
}
@Override
public void accept(String input) {
LOG.debug("Got message from kafka stream: {}", input);
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(input, MediaType.TEXT_PLAIN));
} catch (IOException e) {
LOG.debug("error sending sse message, {}", input);
}
});
}
}
Код: Выделить всё
streamBridge.send("binding-import-device", device.getId());
@KafkaListener(topics = { "binding-import-device" }, groupId = "device")
public void consume(String id) {
log.info("consume device: {}", id);
}
Код: Выделить всё
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2024-11-29T00:07:52.537+07:00 INFO 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Node -1 disconnected.
Подробнее здесь: https://stackoverflow.com/questions/792 ... sconnected