Когда инициируя потребителя AIOKAFKA с ConsumerRebalancelistener , у нас есть процесс обработки для партии сообщений (getmany ()), полученного от Kafka. Мы добавляем этот процесс обработки в on_partitions_revoked , чтобы убедиться, что эта обработка будет завершена, когда произойдет восстановление. /> Я на самом деле не совсем уверен, что нам действительно нужен замок в этом сценарии, так что будет признателен, если люди здесь могут посоветовать по этому поводу. Пока мы используем айокафку для этого случая, я думаю, это может быть общим вопросом для Кафки. < /P>
class KafkaWrapper(ConsumerRebalanceListener):
def __init__(
self,
consumer_bootstrap_servers: List[str],
consumer_topic: str,
consumer_group_id: str,
):
self.records_lock = asyncio.Lock()
self.kafka_consumer = AIOKafkaConsumer(
bootstrap_servers=consumer_bootstrap_servers,
group_id=consumer_group_id,
)
self.kafka_consumer.subscribe(topics=[consumer_topic], listener=self)
async def process_records(self):
async with self.records_lock: # Is this lock really required??
# processing message
async def on_partitions_revoked(self, _revoked):
await self.process_records()
async def on_partitions_assigned(self, _assigned):
pass
async def _watch_kafka(self):
await self.kafka_consumer.start()
while not self.should_stop.is_set():
messages = await self.kafka_consumer.getmany()
if len(local_records_by_topic_partition) > 0:
async with self.records_lock:
self.messages = messages
await self.process_records()
Подробнее здесь: https://stackoverflow.com/questions/647 ... sumer-grou
AIOKAFKA: является ли блокировка, необходимая для обработки сообщения при рассмотрении перебалансирования группы потреби ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение