Проблема, с которой я столкнулся, связана с одним из кластеров. Когда я пытаюсь получить метаданные с помощью следующего кода,
List
partitions = consumer.partitionsFor(topic);
проходит более 40 секунд, и я не могу определить причину. Тот же код и конфигурация отлично работают на других кластерах всего за 300–500 мс. Все операции, включая настройки брокера и ZooKeeper, управляются с помощью Java API.
Не могу понять, почему такая задержка происходит только на этом конкретном кластере. Мы будем очень признательны за любую помощь в определении того, что может быть причиной этого.
Чтобы помочь в устранении неполадок, прежде чем ответить, учтите следующее: конфигурация, код и общая настройка отлично работают на обоих устройствах. кластеры под управлением Linux, а проблема возникает только в Windows. Это говорит о том, что проблема может заключаться не в коде или настройке, а скорее в среде хоста.
В ходе расследования я убедился, что следующие потенциальные причины не являются ответственными:
- Медленная выборка метаданных. Я считаю, что это корень проблемы, поскольку во время регистрации требуется слишком много времени для получения метаданных и получите следующую строку. Потребителю нужны метаданные о теме и разделах, чтобы определить позиции выборки. я также заметил, что выборка метаданных происходит медленно, а с Jprofiler я наблюдал задержки при updateFetchPositions, но я не знаю, как это решить.
- Задержки поиска смещения: я проверил что задержек нет и все проходит гладко.
- Накладные расходы на ребалансировку: журналы подтверждают, что процессы ребалансировки не происходят.
- Большое количество разделов: в данной настройке это не проблема, поскольку количество моих тем меньше 10.
- Регулирование брокера или Загрузка: Я уверен, что проблема не в этом, поскольку подключено только два потребителя.
- Проблемы DNS: Это маловероятно, поскольку Конфигурация bootstrap.servers использует IP-адреса вместо имена хостов.
- Защитник Windows: я исключил папку JetBrains и путь Kafka, Zookeeper из антивируса, что может вызвать задержки, но я не знаю, есть ли они брандмауэр, стоящий за ним, вызывает задержки.
Вот потребитель конфигурация:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip + ":" + KafkaConfiguration.KAFKA_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "13107200");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "536870912");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "-1");
props.put(ConsumerConfig.SEND_BUFFER_CONFIG, "-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "60000");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
props.put(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
props.put(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "FALSE");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
props.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, "7000");
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "300");
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "7000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "11000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "8000");
props.put(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "20000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + "consumer" + "-" + position + "-" + current_ip + "\" password=\"consumer-secret\";");
Вот конфигурация производителя:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfiguration.KAFKA_HOST + ":" + KafkaConfiguration.KAFKA_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConfiguration.PRODUCER_GROUP_ID + "-" + KafkaConfiguration.KAFKA_HOST);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "56384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "0");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "1000");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
props.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, "7000");
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "300");
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "7000");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728");
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2024000000");
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "-1");
props.put(ProducerConfig.SEND_BUFFER_CONFIG, "-1");
props.put(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
props.put(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
props.put(ProducerConfig.RETRIES_CONFIG, "0");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer-secret\";");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
Напоследок — конфигурация брокера Kafka:
kafkaProps.setProperty("broker.id", "0");
kafkaProps.setProperty("log.dirs", zkDir);
kafkaProps.setProperty("zookeeper.connection.timeout.ms", "15000");
kafkaProps.setProperty("zookeeper.max.in.flight.requests", "10");
//kafkaProps.setProperty("zookeeper.set.acl","true");
kafkaProps.setProperty("zookeeper.connect", KafkaConfiguration.ZOOKEEPER_HOST + ":" + KafkaConfiguration.ZOOKEEPER_PORT);
kafkaProps.setProperty("offsets.topic.replication.factor", "1");
kafkaProps.setProperty("delete.topic.enable", "true");
kafkaProps.setProperty("num.network.threads", String.valueOf(this.cores));
kafkaProps.setProperty("num.io.threads", String.valueOf(this.cores * 2));
kafkaProps.setProperty("auto.delete.topics.enable", "true");
kafkaProps.setProperty("auto.create.topics.enable", "false");
kafkaProps.setProperty("auto.leader.rebalance.enable", "false");
kafkaProps.setProperty("background.threads", String.valueOf(this.cores / 2));
kafkaProps.setProperty("metrics.reporters", "");
kafkaProps.setProperty("default.api.timeout.ms", "15000");
kafkaProps.setProperty("log.flush.interval.ms", "15000");
kafkaProps.setProperty("max.connections.per.ip,", "250");
kafkaProps.setProperty("remote.fetch.max.wait.ms,", "10");
kafkaProps.setProperty("message.max.bytes", "2024000000");
kafkaProps.setProperty("socket.request.max.bytes", "2024000000");
kafkaProps.setProperty("fetch.max.bytes", "536870912");
kafkaProps.setProperty("group.consumer.heartbeat.interval.ms", "15000");
kafkaProps.setProperty("group.consumer.min.heartbeat.interval.ms", "15000");
kafkaProps.setProperty("group.consumer.max.heartbeat.interval.ms", "45000");
kafkaProps.setProperty("group.consumer.max.session.timeout.ms", "65000");
kafkaProps.setProperty("group.consumer.session.timeout.ms", "60000");
kafkaProps.setProperty("replica.fetch.wait.max.ms", "10");
kafkaProps.setProperty("replica.fetch.max.bytes", "13107200");
kafkaProps.setProperty("auto.include.jmx.reporter", "false");
kafkaProps.setProperty("request.timeout.ms", "25");
kafkaProps.setProperty("socket.receive.buffer.bytes", "-1");
kafkaProps.setProperty("compression.type", "lz4");
kafkaProps.setProperty("socket.send.buffer.bytes", "-1");
kafkaProps.setProperty("socket.connection.setup.timeout.ms", "15000");
kafkaProps.setProperty("group.initial.rebalance.delay.ms", "0");
kafkaProps.setProperty("metadata.max.idle.interval.ms", "1500");
kafkaProps.setProperty("metadata.max.retention.ms", "60480000");
kafkaProps.setProperty("session.timeout.ms", "9000");
kafkaProps.setProperty("heartbeat.interval.ms", "14000");
kafkaProps.setProperty("producer.id.expiration.ms", String.valueOf(ConsensusConfiguration.EPOCH_TRANSITION * 2 * ConsensusConfiguration.CONSENSUS_TIMER));
kafkaProps.setProperty("log.retention.ms", String.valueOf(ConsensusConfiguration.EPOCH_TRANSITION * 2 * ConsensusConfiguration.CONSENSUS_TIMER));
kafkaProps.setProperty("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
kafkaProps.setProperty("listeners", "SASL_PLAINTEXT://" + KafkaConfiguration.KAFKA_HOST + ":" + KafkaConfiguration.KAFKA_PORT);
kafkaProps.setProperty("advertised.listeners", "SASL_PLAINTEXT://" + KafkaConfiguration.KAFKA_HOST + ":" + KafkaConfiguration.KAFKA_PORT);
kafkaProps.setProperty("security.inter.broker.protocol", "SASL_PLAINTEXT");
kafkaProps.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
kafkaProps.setProperty("sasl.enabled.mechanisms", "PLAIN");
kafkaProps.setProperty("allow.everyone.if.no.acl.found", "false");
kafkaProps.setProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config", getJaasConfigString());
kafkaProps.setProperty("super.users", getSuperUsers());
Подробнее здесь: https://stackoverflow.com/questions/792 ... -fetch-why