Подписка на несколько разделов в Центре событий Azure с помощью PythonPython

Программы на Python
Ответить
Anonymous
 Подписка на несколько разделов в Центре событий Azure с помощью Python

Сообщение Anonymous »

Я создал пространство имен концентратора событий. Внутри пространства имен концентратора событий я создал концентратор событий с 8 разделами. У него есть одна группа потребителей — $Default.
Я написал код приемника на Python, который выглядит следующим образом.

Код: Выделить всё

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
BlobCheckpointStore,
)

BLOB_STORAGE_CONNECTION_STRING = "BLOB_STORAGE_CONNECTION_STRING"
BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "EVENT_HUB_NAME"

async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)

# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)

async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string(
BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
)

# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENT_HUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
# Call the receive method. Read from the beginning of the
# partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")

if __name__ == "__main__":
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
Этот код я взял из этого документа. Теперь я запустил приведенный выше код на 5 разных виртуальных машинах. Таким образом, ожидается, что все 5 получателей должны одновременно обрабатывать 5 разных сообщений. После обработки одного сообщения свободный получатель должен принять другое сообщение. Это должно продолжаться до тех пор, пока кто-нибудь не остановит код получателя.
Проблема, с которой я столкнулся, заключается в том, что одно и то же сообщение принимается несколькими получателями и обрабатывается снова и снова. Я предполагаю, что контрольно-пропускной пункт не происходит должным образом. Но я точно не знаю, почему этого не происходит. Или, возможно, приведенный выше код не соответствует моим ожиданиям.
Вот версии, которые я использую:

Имя: azure-eventhub Версия: 5.10.1
Название: azure-eventhub-checkpointstoreblob-aio Версия: 1.1.4

Что можно Я попробую дальше?

Подробнее здесь: https://stackoverflow.com/questions/761 ... ing-python
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»