Я выполняю транзакцию обновления в базе данных Oracle, после завершения обновления в соответствующей теме должно быть опубликовано слитное сообщение Kafka.
Теперь я хотел бы найти это опубликованное сообщение, используя сообщение KEY или VALUE. Как я могу найти сообщение и распечатать его. Пожалуйста, кто-нибудь помогите мне с этим с помощью примера.
Я попробовал приведенный ниже пример, но ничего не работает:
def filter_messages(self, topic, key_filter=None,value_filter = None, timeout=10.0):
try:
# Get the list of partitions for the topic
partitions = self.consumer.list_topics(topic).topics[topic].partitions
# Create TopicPartition objects for all partitions
topic_partitions = [TopicPartition(topic, p) for p in partitions]
# Assign the consumer to all partitions
self.consumer.assign(topic_partitions)
while True:
# Poll for a new message
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue # No new message within the poll interval
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition.")
break # Stop if reached the end of the partition
else:
print(f"Error while consuming message: {msg.error()}")
continue
# Decode the message key and value if present
key = msg.key().decode('utf-8') if msg.key() else None
value = msg.value().decode('utf-8') if msg.value() else None
# Check if both key and value match the target criteria
if key == key_filter and value == value_filter:
print(f"Found matching message:\nKey: {key}\nValue: {value}")
return # Stop after printing the first match
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
Подробнее здесь: https://stackoverflow.com/questions/791 ... or-a-value
Я хотел бы найти сообщение в слитной теме Kafka, используя ключ или значение сообщения, а затем хотел бы распечатать соо ⇐ Python
Программы на Python
1731652944
Anonymous
Я выполняю транзакцию обновления в базе данных Oracle, после завершения обновления в соответствующей теме должно быть опубликовано слитное сообщение Kafka.
Теперь я хотел бы найти это опубликованное сообщение, используя сообщение KEY или VALUE. Как я могу найти сообщение и распечатать его. Пожалуйста, кто-нибудь помогите мне с этим с помощью примера.
Я попробовал приведенный ниже пример, но ничего не работает:
def filter_messages(self, topic, key_filter=None,value_filter = None, timeout=10.0):
try:
# Get the list of partitions for the topic
partitions = self.consumer.list_topics(topic).topics[topic].partitions
# Create TopicPartition objects for all partitions
topic_partitions = [TopicPartition(topic, p) for p in partitions]
# Assign the consumer to all partitions
self.consumer.assign(topic_partitions)
while True:
# Poll for a new message
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue # No new message within the poll interval
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition.")
break # Stop if reached the end of the partition
else:
print(f"Error while consuming message: {msg.error()}")
continue
# Decode the message key and value if present
key = msg.key().decode('utf-8') if msg.key() else None
value = msg.value().decode('utf-8') if msg.value() else None
# Check if both key and value match the target criteria
if key == key_filter and value == value_filter:
print(f"Found matching message:\nKey: {key}\nValue: {value}")
return # Stop after printing the first match
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
Подробнее здесь: [url]https://stackoverflow.com/questions/79191318/i-would-like-to-search-a-message-in-confluent-kafka-topic-using-a-key-or-a-value[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия