Как получить только последние записи в php kafkaPhp

Кемеровские программисты php общаются здесь
Ответить
Anonymous
 Как получить только последние записи в php kafka

Сообщение Anonymous »

Я использую arnaud-lb/php-rdkafka в качестве клиента PHP Kafka. Я использую Windows 10 и PHP 7.4. Я пытаюсь получить через потребителя только самые последние записи, но он возвращает все записи. Я не уверен, не совершает ли он смещение согласно моему следующему коду или есть какая-то другая причина. Я также пытался найти, как зафиксировать смещение, если оно не происходит автоматически, но ничего не смог понять.

Код: Выделить всё

$conf = new RdKafka\Conf();
//myConsumerGroup
$conf->set('group.id', 'myConsumerGroup'.date('Ymdhis'));
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("TestTopic", $topicConf);
//RD_KAFKA_OFFSET_BEGINNING RD_KAFKA_OFFSET_STORED
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
echo '';
while (true) {
$message = $topic->consume(0, 2*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
print_r($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
  • Если я укажу идентификатор группы = myConsumerGroup, она ничего не вернет.
  • Если я укажу RD_KAFKA_OFFSET_BEGINNING вместо RD_KAFKA_OFFSET_STORED затем он возвращает все записи с начала.
  • В приведенном выше примере кода как в RD_KAFKA_OFFSET_BEGINNING, так и в RD_KAFKA_OFFSET_STORED возвращает все записи.
  • Если я использую RD_KAFKA_OFFSET_BEGINNING и идентификатор группы «myConsumerGroup», то он возвращает все записи с начала.


Подробнее здесь: https://stackoverflow.com/questions/716 ... -php-kafka
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Php»