Приостановить и возобновить поток потребителя в весеннем облаке с помощью связующего устройства Kafka.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Приостановить и возобновить поток потребителя в весеннем облаке с помощью связующего устройства Kafka.

Сообщение Anonymous »

Проблема: пауза и возобновление не работают при использовании потока Spring Cloud с связующим устройством Kafka. Даже после паузы в потребительской функции я все равно вижу, как печатается полезная нагрузка.
Если я печатаю статус моей привязки, он отображается как приостановленный, но потребитель продолжает печатать.
Следующее это мой потребительский класс:

package com.example.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.messaging.Message;

import java.util.List;
import java.util.function.Function;

@Configuration
public class SampleKafkaBinderConsumer {
private static final Logger log = LoggerFactory.getLogger(SampleKafkaBinderConsumer.class);

@Autowired
BindingsLifecycleController endpoint;

@Bean
public ApplicationListener idleListener() {
return event -> {
List> bindings = endpoint.queryState("sampleErrorConsumer-in-0");
Binding bindingtemp = bindings.get(0);
log.info("Starting and pausing consumer" + s.getPayload());
if (!bindingtemp.isPaused()) {
System.out.println("Bindings is not paused");
return s;
} else {
endpoint.changeState("sampleErrorConsumer-in-0", BindingsLifecycleController.State.PAUSED);
}
return null;
};
}
}


Ниже приведен мой файл application.yml
spring:
application:
name: kafkademo
cloud:
stream:
bindings:
producerSupplier-out-0:
destination: testkafkabinderproduce
binder: kafka1
functionLogger-out-0:
binder: kafka2
destination: testkafkabinderout
functionLogger-in-0:
binder: kafka2
destination: testkafkabinderproduce
group: kstreamgroup
sampleErrorConsumer-in-0:
binder: kafka1
destination: errordefaultout
group: dlqErrorConsumer
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: 'localhost:29092'
applicationId: kafkademo
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
deserializationExceptionHandler: sendToDlq
kafka2:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
applicationId: kafkademo
brokers: 'localhost:29092'
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde

kafka:
bindings:
sampleErrorConsumer-in-0:
consumer:
enable-dlq: true
idle-event-interval: 60000

function:
definition: sampleErrorConsumer;functionLogger;producerSupplier

spring:
kafka:
bootstrap-servers: 'localhost:29092'

spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.dlqName: errordefaultout
spring.cloud.stream.kafka.streams.bindings.functionLogger-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.bindings.sampleErrorConsumer-in-0.consumer.configuration.max.poll.records: 1

Файл Gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '3.4.1'
id 'io.spring.dependency-management' version '1.1.7'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

repositories {
mavenCentral()
}

ext {
set('springCloudVersion', "2024.0.0")
}

dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.cloud:spring-cloud-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

tasks.named('test') {
useJUnitPlatform()
}


Подробнее здесь: https://stackoverflow.com/questions/793 ... fka-binder
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»