Проблема: пауза и возобновление не работают при использовании потока Spring Cloud с связующим устройством Kafka. Даже после паузы в потребительской функции я все равно вижу, как печатается полезная нагрузка.
Если я печатаю статус моей привязки, он отображается как приостановленный, но потребитель продолжает печатать.
Следующее это мой потребительский класс:
package com.example.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.messaging.Message;
import java.util.List;
import java.util.function.Function;
@Configuration
public class SampleKafkaBinderConsumer {
private static final Logger log = LoggerFactory.getLogger(SampleKafkaBinderConsumer.class);
@Autowired
BindingsLifecycleController endpoint;
@Bean
public ApplicationListener idleListener() {
return event -> {
List> bindings = endpoint.queryState("sampleErrorConsumer-in-0");
Binding bindingtemp = bindings.get(0);
log.info("Starting and pausing consumer" + s.getPayload());
if (!bindingtemp.isPaused()) {
System.out.println("Bindings is not paused");
return s;
} else {
endpoint.changeState("sampleErrorConsumer-in-0", BindingsLifecycleController.State.PAUSED);
}
return null;
};
}
}
Ниже приведен мой файл application.yml
spring:
application:
name: kafkademo
cloud:
stream:
bindings:
producerSupplier-out-0:
destination: testkafkabinderproduce
binder: kafka1
functionLogger-out-0:
binder: kafka2
destination: testkafkabinderout
functionLogger-in-0:
binder: kafka2
destination: testkafkabinderproduce
group: kstreamgroup
sampleErrorConsumer-in-0:
binder: kafka1
destination: errordefaultout
group: dlqErrorConsumer
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: 'localhost:29092'
applicationId: kafkademo
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
deserializationExceptionHandler: sendToDlq
kafka2:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
applicationId: kafkademo
brokers: 'localhost:29092'
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
kafka:
bindings:
sampleErrorConsumer-in-0:
consumer:
enable-dlq: true
idle-event-interval: 60000
function:
definition: sampleErrorConsumer;functionLogger;producerSupplier
spring:
kafka:
bootstrap-servers: 'localhost:29092'
spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.dlqName: errordefaultout
spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.bindings.sampleErrorConsumer-in-0.consumer.configuration.max.poll.records: 1
Файл Gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '3.4.1'
id 'io.spring.dependency-management' version '1.1.7'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2024.0.0")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.cloud:spring-cloud-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... fka-binder
Приостановить и возобновить поток потребителя в весеннем облаке с помощью связующего устройства Kafka. ⇐ JAVA
Программисты JAVA общаются здесь
1736950934
Anonymous
Проблема: пауза и возобновление не работают при использовании потока Spring Cloud с связующим устройством Kafka. Даже после паузы в потребительской функции я все равно вижу, как печатается полезная нагрузка.
Если я печатаю статус моей привязки, он отображается как приостановленный, но потребитель продолжает печатать.
Следующее это мой потребительский класс:
package com.example.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.messaging.Message;
import java.util.List;
import java.util.function.Function;
@Configuration
public class SampleKafkaBinderConsumer {
private static final Logger log = LoggerFactory.getLogger(SampleKafkaBinderConsumer.class);
@Autowired
BindingsLifecycleController endpoint;
@Bean
public ApplicationListener idleListener() {
return event -> {
List> bindings = endpoint.queryState("sampleErrorConsumer-in-0");
Binding bindingtemp = bindings.get(0);
log.info("Starting and pausing consumer" + s.getPayload());
if (!bindingtemp.isPaused()) {
System.out.println("Bindings is not paused");
return s;
} else {
endpoint.changeState("sampleErrorConsumer-in-0", BindingsLifecycleController.State.PAUSED);
}
return null;
};
}
}
Ниже приведен мой файл application.yml
spring:
application:
name: kafkademo
cloud:
stream:
bindings:
producerSupplier-out-0:
destination: testkafkabinderproduce
binder: kafka1
functionLogger-out-0:
binder: kafka2
destination: testkafkabinderout
functionLogger-in-0:
binder: kafka2
destination: testkafkabinderproduce
group: kstreamgroup
sampleErrorConsumer-in-0:
binder: kafka1
destination: errordefaultout
group: dlqErrorConsumer
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: 'localhost:29092'
applicationId: kafkademo
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
deserializationExceptionHandler: sendToDlq
kafka2:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
applicationId: kafkademo
brokers: 'localhost:29092'
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
kafka:
bindings:
sampleErrorConsumer-in-0:
consumer:
enable-dlq: true
idle-event-interval: 60000
function:
definition: sampleErrorConsumer;functionLogger;producerSupplier
spring:
kafka:
bootstrap-servers: 'localhost:29092'
spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.dlqName: errordefaultout
spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.bindings.sampleErrorConsumer-in-0.consumer.configuration.max.poll.records: 1
Файл Gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '3.4.1'
id 'io.spring.dependency-management' version '1.1.7'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2024.0.0")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.cloud:spring-cloud-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79358564/pause-and-resume-consumer-in-spring-cloud-stream-using-kafka-binder[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия