Моя проблема в том, что в одном кластере я пытаюсь получить метаданные с помощью следующей строки
Код: Выделить всё
List
partitions = consumer.partitionsFor(topic);
Код: Выделить всё
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip + ":" + KafkaConfiguration.KAFKA_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConfiguration.CONSUMER_PRIVATE_GROUP_ID + "-" + i + "-" + KafkaConfiguration.KAFKA_HOST);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 60000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "500");
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "8000");
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "500");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + "consumer" + "-" + position + "-" + current_ip + "\" password=\"consumer-secret\";");
Подробнее здесь: https://stackoverflow.com/questions/792 ... -fetch-why