Я использую arnaud-lb/php-rdkafka в качестве клиента PHP Kafka. Я использую Windows 10 и PHP 7.4. Я пытаюсь получить через потребителя только самые последние записи, но он возвращает все записи. Я не уверен, не совершает ли он смещение согласно моему следующему коду или есть какая-то другая причина. Я также пытался найти, как зафиксировать смещение, если оно не происходит автоматически, но ничего не смог понять.
Я использую arnaud-lb/php-rdkafka в качестве клиента PHP Kafka. Я использую Windows 10 и PHP 7.4. Я пытаюсь получить через потребителя только самые последние записи, но он возвращает все записи. Я не уверен, не совершает ли он смещение согласно моему следующему коду или есть какая-то другая причина. Я также пытался найти, как зафиксировать смещение, если оно не происходит автоматически, но ничего не смог понять. [code]$conf = new RdKafka\Conf(); //myConsumerGroup $conf->set('group.id', 'myConsumerGroup'.date('Ymdhis')); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $rk->newTopic("TestTopic", $topicConf); //RD_KAFKA_OFFSET_BEGINNING RD_KAFKA_OFFSET_STORED $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); echo ''; while (true) { $message = $topic->consume(0, 2*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: print_r($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } [/code] [list] [*]Если я укажу идентификатор группы = myConsumerGroup, она ничего не вернет. [*]Если я укажу RD_KAFKA_OFFSET_BEGINNING вместо RD_KAFKA_OFFSET_STORED затем он возвращает все записи с начала. [*]В приведенном выше примере кода как в RD_KAFKA_OFFSET_BEGINNING, так и в RD_KAFKA_OFFSET_STORED возвращает все записи. [*]Если я использую RD_KAFKA_OFFSET_BEGINNING и идентификатор группы «myConsumerGroup», то он возвращает все записи с начала. [/list]