Кафка дает: «Член группы должен иметь действительный идентификатор участника, прежде чем он фактически войдет в группу пJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Кафка дает: «Член группы должен иметь действительный идентификатор участника, прежде чем он фактически войдет в группу п

Сообщение Anonymous »

Я использую Kafka для обработки сообщений на Java. Я хочу протестировать, запустив одно и то же приложение несколько раз на своем локальном компьютере. Когда я запускаю, я впервые могу начать потреблять сообщения из этой темы. Когда я запускаю второй, я получаю:
Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

и не получайте никаких сообщений по этой теме. Если я попытаюсь запустить больше из них, у меня возникнут те же проблемы.
Конфигурация, которую я использую для Kafka:
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.use.type.headers: false
listener:
missing-topics-fatal: false

У меня есть две темы@Configuration
public class KafkaTopics {
@Bean("alertsTopic")
public NewTopic alertsTopic() {

return TopicBuilder.name("XXX.alerts")
.compact()
.build();
}

@Bean("serversTopic")
public NewTopic serversTopic() {

return TopicBuilder.name("XXX.servers")
.compact()
.build();
}

}

И два слушателя в разных файлах классов.
@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
})
public void registerServer(
@Payload(required = false) ServerInfo serverInfo
)

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
})
public void processAlert(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) long offset,
@Payload(required = false) AlertOnKafka alert)



Подробнее здесь: https://stackoverflow.com/questions/639 ... ore-actual
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»