AIOKAFKA: является ли блокировка, необходимая для обработки сообщения при рассмотрении перебалансирования группы потребиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 AIOKAFKA: является ли блокировка, необходимая для обработки сообщения при рассмотрении перебалансирования группы потреби

Сообщение Anonymous »

Когда инициируя потребителя AIOKAFKA с ConsumerRebalancelistener , у нас есть процесс обработки для партии сообщений (getmany ()), полученного от Kafka. Мы добавляем этот процесс обработки в on_partitions_revoked , чтобы убедиться, что эта обработка будет завершена, когда произойдет восстановление. /> Я на самом деле не совсем уверен, что нам действительно нужен замок в этом сценарии, так что будет признателен, если люди здесь могут посоветовать по этому поводу. Пока мы используем айокафку для этого случая, я думаю, это может быть общим вопросом для Кафки. < /P>
class KafkaWrapper(ConsumerRebalanceListener):
def __init__(
self,
consumer_bootstrap_servers: List[str],
consumer_topic: str,
consumer_group_id: str,
):
self.records_lock = asyncio.Lock()

self.kafka_consumer = AIOKafkaConsumer(
bootstrap_servers=consumer_bootstrap_servers,
group_id=consumer_group_id,
)
self.kafka_consumer.subscribe(topics=[consumer_topic], listener=self)

async def process_records(self):
async with self.records_lock: # Is this lock really required??
# processing message

async def on_partitions_revoked(self, _revoked):
await self.process_records()

async def on_partitions_assigned(self, _assigned):
pass

async def _watch_kafka(self):
await self.kafka_consumer.start()

while not self.should_stop.is_set():
messages = await self.kafka_consumer.getmany()

if len(local_records_by_topic_partition) > 0:
async with self.records_lock:
self.messages = messages
await self.process_records()


Подробнее здесь: https://stackoverflow.com/questions/647 ... sumer-grou
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»