Ошибка -
**
isrom resmount applicationContext. Чтобы отобразить отчет об оценке условий, повторно запустите ваше приложение с помощью «Debug». Запуск приложения не удалось
org.springframework.beans.factory.unsatisfiedDependenciesException: Ошибка создания боба с именем 'InventoryController', определенный в файле [C: \ users \ aisantra \ Downloads \ Ecommerce \ Class \ Compass \ Commerce \ Controler \ InventoryController.class]: неудовлетворенная зависимость, выраженная через параметр конструктора 0: Создание ошибок Бин с именем 'kafkastreams', определенные в ресурсе пути класса [com/example/ecommerce/config/kafkastreamsconfig.class]: не удалось создать экземпляры [org.apache.kafka.streams.kafkastreams]: фабричный метод 'kafkastreams, сброшенные с сообщением: сообщено: сообщение: Сообщение: сбросить исключение: «Разбросить исключение»: Неверная топология: топология не имеет потоковых потоков и никаких глобальных потоков, должна подписаться хотя бы на один источник Тема или глобальная таблица.
at org.springframework.beans.factory.support.construpresolver.createargumentarray (ConstructorResolver.java:804) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
в org.springframework.beans.factory.support.constructorresolver.autowireconstructor (constructorresolver.java:240) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.autowireconstructor (Abstractautowirecapablebeanfactory.java:1381) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.createbeaninstance (Abstractautowirecapablebeanfactory.java:1218) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.docreateebean (Abstractautowirecapablebeanfactory.java:563) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.createbean (Abstractautowirecapablebeanfactory.java:523) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractbeanfactory.lambda $ dogetbean $ 0 (Abstractbeanfactory.java:336) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.defaultsingletonbeanregistry.getsingleton (Defaultingletonbeanregistry.java:307) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractbeanfactory.dogetbean (Abstractbeanfactory.java:334) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support. .AbStractBeanFactory.getBean (AbstractBeanFactory.java:199) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.defaultlistablebeanfactory.instantaitesingleton (default-stistablebeanfactory.java:1122) ~ [Spring-Beans-6.2.2 .jar: 6.2.2]
at org.springframework.beans.factory.support.defaultlistablebeanfactory.preinstantiateSingleton (по умолчанию stistablebeanfactory.java:1093) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.defaultlistablebeanfactory.preinstantiatesingletons (defaultistablebeanfactory.java:1030) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.context.support.abstractapplicationcontext.finishbeanfactoryinialization (AbstractApplicationContext.java:987) ~ [Spring-Context-6.2.2.jar: 6.2.2]
at org.springframework.context.support.abstractapplicationcontext.refresh (AbstractApplicationContext.java:627) ~ [Spring-Context-6.2.2.jar: 6.2.2]
at org.springframework.boot.web.servlet.context.servletwebserverApplicationContext.REFRESH (ServletWebserverApplicationContext.java:146) ~ [Spring-Boot-3.4.2. .refresh (Springapplication.java:752) ~ [Spring-Boot-3.4.2.jar: 3.4.2]
at org.springframework.boot.springapplication.refreshcontext (Springapplication.java:439) ~ [Spring-Boot-3.4.2.jar: 3.4 .2]
at org.springframework.boot.springapplication.run (Springapplication.java:318) ~ [Spring-Boot-3.4.2.jar: 3.4.2]
at org.springframework.boot.springapplication.run (Springapplication.java:1361) ~ [Spring-Boot-3.4.2.jar: 3.4 .2]
at org.springframework.boot.springapplication.run (Springapplication.java:1350) ~ [Spring-Boot-3.4.2.jar: 3.4.2]
at com.example.ecommerce.ecommerceaplication.main (ecommerceplication.java:12) ~ [classes /: na]
вызвано : org.springframework.beans.factory.beancreationException: Ошибка создания боба с именем 'kafkastreams' определено в ресурсе пути класса [com/example/ecommerce/config/kafkastreamsconfig.class]: не удалось создать экземпляры [org.apache.kafka.streams.kafkastreams]: фабричный метод 'kafkastreams' исключение с помощью сообщения: недействительная топология: топология не имеет потоков и нет глобального потоки, должны подписаться как минимум к одной теме источника или глобальной таблице.
at org.springframework.beans.factory.support.constructorresolver.instantiate (constructorresolver.java:657) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.constructorresolver.instantiateusingFactoryMethod (ConstructorResolver.java:645) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.instantiateusingFactoryMethod (AbstractautowirecapableBeanFactory.java:1361) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.createbeaninstance (Abstractautowirecapablebeanfactory.java:1191) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.docreateebean (Abstractautowirecapablebeanfactory.java:563) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractautowirecapablebeanfactory.createbean (Abstractautowirecapablebeanfactory.java:523) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractbeanfactory.lambda $ dogetbean $ 0 (Abstractbeanfactory.java:336) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.defaultsingletonbeanregistry.getsingleton (Defaultingletonbeanregistry.java:307) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.abstractbeanfactory.dogetbean (Abstractbeanfactory.java:334) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support. .AbStractBeanFactory.getBean (AbstractBeanFactory.java:199) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.defaultlistablebeanfactory.doresolvedependency (default-listablebeanfactory.java:1573) ~ [Spring-Beans-6.2.2 .jar: 6.2.2]
at org.springframework.beans.factory.support.defaultlistablebeanfactory.ResolvedEpendency (default-lectableBeanfactory.java:1519) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.constructorresolver.resolveautowiredargument (constructorresolver.java:913) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.constructorresolver.createargumentarray (constructorresolver.java:791) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
... 21 общие кадра опустили
Вызвано: org.springframework.beans.beaninstantiationexception: не удалось создать экземпляр [org.apache.kafka.streams.kafkastreams]: Заводский метод «Kafkastreams» бросает исключение с сообщением: неверная топология: топология не имеет потоков и никаких глобальных потоков, должна подписаться как минимум к одной теме источника или глобальной таблице.
в org.springframework.beans.factory.support.simpleinstantiationstrategy.lambda $ instantiate $ 0 (SimpleInstantiationStrategy.java:199) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
AT org.springframework.beans.factory.support.simpleinstantiationstrategy.instantiateWithFactoryMethod (SimpleInstantiationStrategy.java:88) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.simpleinstantiationstrategy.instantiate (SimpleInstantiationStrategy.java:168) ~ [Spring-Beans-6.2.2.jar: 6.2.2]
at org.springframework.beans.factory.support.constructorresolver.instantiate (constructorresolver.java:653) ~ [Spring-Beans-6.2.2 > Вызван: org.apache.kafka.streams.errors.topology Exception: неверная топология: топология нет потоковые потоки и без глобальных потоков, должны подписаться по крайней мере на одну тему источника или глобальную таблицу. Streams-3.8.1.jar: na]
at org.apache.kafka.streams.kafkastreams. (kafkastreams.java:1026) ~ [kafka-streams-3.8.1.jar: na]
at org.apache.kafka.streams.kafkastream. (kafkastreams.java. : 955) ~ [kafka-streams-3.8.1.jar: na]
at org.apache.kafka.streams.kafkastreams. (kafkastreams.java:925) ~ [kafka-streams-3.8.1.jar: na]
at org.apache.kafka.streams.kafkastreams. (kafkastreams.java:907) ~ [kafka-streams-3.8.1.jar: na]
at org.apache.kafka.streams.kafkastream. (kafkastreams.java. : 837) ~ [kafka-streams-3.8.1.jar: na]
at com.example.ecommerce.config.kafkastreamsconfig.kafkastreams (kafkastreamsconfig.java:61) ~ [classes /: na]
at com.example.ecommerce.config.kafkastreamsconfig $$ springcglib $$ 0.cglib $ kafkastreams $ 1 () ~ [классы /: na]
at com.example.ecommerce.config.kafkastreamsconfig $$ .invoke () ~ [классы /: na]
at org.springframework.cglib.proxy.methodproxy.invokesuper (methodproxy.java:258) ~ [Spring-Core-6.2.2.jar: 6.2.2]
at org.springframework.context.annotation.configurationclassenhancer $ beanmethodinterceptor.intercept (configurationlassenhancer.java:348) ~ [Spring-Context-6.2.2.jar: 6.2.2]
at com.example.ecommerce.config.kafkastreamsconfig $$ springcglib $$ 0.kafkastreams () ~ [classes/: na]
at java.base/jdk.internal.reflect.nativemethodaccessormpl.invoke0 (нативный метод) ~ : na]
at java.base /jdk.internal.reflect.nativemethodaccessorimpl.invoke (nativemethodacccorsiMpl.java:77) ~ [na: na]
at java.base/jdk.internal.reflect.delegatingmethodaccessorimpl.invoke (делегирование methodacccessorimpl.java:43) ~ [na: na]
at java.base/java.lang.reflect.method.invoke (метод. ) ~ [na: na]
at org.springframework.beans.factory.support.simpleinstantiationstrategy.lambda $ instantiate $ 0 (SimpleInstantiationStrategy.java:171) ~ [Spring-Bean < /p>
Kafkastreamsconfig.java file
Это файл kafka Streams, в котором есть реализация потока < /p>
< /blockquote>
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapAddress;
private KafkaStreams kafkaStreams;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig() {
Map props = new HashMap();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream kStream(StreamsBuilder streamsBuilder) {
String sourceTopic = "order-topics";
String destinationTopic = "inventory-topic";
//create a stream from the source kafka topic
KStream orderStream = streamsBuilder.stream(sourceTopic);
// Process the order and update the inventory
KStream inventoryStream = orderStream.mapValues(orderQuantity-> -orderQuantity);// reduce based on orders
inventoryStream.to(destinationTopic, Produced.with(Serdes.String(),Serdes.Long()));
return inventoryStream;
}
@Bean
public StreamsBuilder streamsBuilder(){
return new StreamsBuilder();
}
@Bean(destroyMethod = "close") // Ensures KafkaStreams shuts down properly
public KafkaStreams kafkaStreams(StreamsBuilder streamsBuilder) {
kafkaStreams = new KafkaStreams(streamsBuilder.build(), kStreamsConfig().asProperties());
// Graceful Shutdown Hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down Kafka Streams...");
kafkaStreams.close();
System.out.println("Kafka Streams shut down successfully.");
}));
kafkaStreams.start();
return kafkaStreams;
}
@PreDestroy
public void closeKafkaStreams() {
if (kafkaStreams != null) {
System.out.println("PreDestroy: Closing Kafka Streams...");
kafkaStreams.close();
System.out.println("Kafka Streams closed.");
}
}
}
< /code>
inventorystreamprocessor.java file < /p>
< /blockquote>
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
@Component
public class InventoryStreamProcessor {
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
KStream orderStream = streamsBuilder
.stream("order-topics", Consumed.with(Serdes.String(), Serdes.Integer()));
KTable inventoryTable = orderStream
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
.aggregate(
() -> 100, // Default inventory count for each product
(key, orderQty, currentInventory) -> currentInventory - orderQty,
Materialized.as("inventory-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer())
);
inventoryTable.toStream().to("inventory-topic");
}
}
< /code>
контроллер < /p>
< /blockquote>
package com.example.Ecommerce.controller;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/inventory")
public class InventoryController {
private final KafkaStreams kafkaStreams;
public InventoryController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping("/{productId}")
public Integer getInventory(@PathVariable String productId) {
ReadOnlyKeyValueStore inventoryStore = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore())
);
Integer stock = inventoryStore.get(productId);
return (stock!=null)?stock:0;
}
}
< /code>
Основной файл < /p>
< /blockquote>
package com.example.Ecommerce;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
public class EcommerceApplication {
public static void main(String[] args) {
SpringApplication.run(EcommerceApplication.class, args);
}
}```
> application.properties
spring.application.name=Ecommerce
# Kafka Configuration
kafka.bootstrap-servers=localhost:9092
kafka.consumer.group-id=order-group
kafka.consumer.auto-offset-reset=earliest
kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.streams.application-id = inventory-tracking-app
Подробнее здесь: https://stackoverflow.com/questions/794 ... s-must-sub