Azure Container Happs Works с интеграцией концентраторов событий бесконечно зацикливается, даже если нет новых событийPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Azure Container Happs Works с интеграцией концентраторов событий бесконечно зацикливается, даже если нет новых событий

Сообщение Anonymous »

Я использую задания Azure Container Apps с помощью триггера, управляемого событием, через концентраторы Azure Event с Blobmetadata в качестве стратегии контрольной точки. Работа запускается, как и должно быть, магазин контрольных пунктов обновляется заданием, как и должно. Проблема заключается в том, что задания запускаются сразу после окончания бесконечной петли, хотя новых событий нет. Это я подтвердил через журналы задания, для первого запуска все события регистрируются, все последующие события вообще не имеют вообще никаких журналов событий.

Код: Выделить всё

eventTriggerConfig: {
parallelism: 1
replicaCompletionCount: 1
scale: {
rules: [
{
name: 'event-hub-trigger'
type: 'azure-eventhub'
auth: [
{
secretRef: 'event-hub-connection-string'
triggerParameter: 'connection'
}
{
secretRef: 'storage-account-connection-string'
triggerParameter: 'storageConnection'
}
]
metadata: {
blobContainer: containerName
checkPointStrategy: 'blobMetadata'
consumerGroup: eventHubConsumerGroupName
eventHubName: eventHubName
connectionFromEnv: 'EVENT_HUB_CONNECTION_STRING'
storageConnectionFromEnv: 'STORAGE_ACCOUNT_CONNECTION_STRING'
activationUnprocessedEventThreshold: 1
unprocessedEventThreshold: 5
}
}
]
}
}
< /code>
Вот логика работы на основе Python: < /p>
import asyncio
from datetime import datetime, timedelta, timezone
import logging
import os

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from azure.identity.aio import DefaultAzureCredential

BLOB_STORAGE_ACCOUNT_URL = os.getenv("BLOB_STORAGE_ACCOUNT_URL")
BLOB_CONTAINER_NAME = os.getenv("BLOB_CONTAINER_NAME")
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = os.getenv("EVENT_HUB_FULLY_QUALIFIED_NAMESPACE")
EVENT_HUB_NAME = os.getenv("EVENT_HUB_NAME")
EVENT_HUB_CONSUMER_GROUP = os.getenv("EVENT_HUB_CONSUMER_GROUP")

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

credential = DefaultAzureCredential()

# Global variable to track the last event time
last_event_time = None
WAIT_DURATION = timedelta(seconds=30)

async def on_event(partition_context, event):
global last_event_time

if event is not None:
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
else:
print(f"Received a None event from partition ID: {partition_context.partition_id}")

# Update the last event time
last_event_time = datetime.now(timezone.utc)

await partition_context.update_checkpoint(event)

async def receive():
global last_event_time

checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)

client = EventHubConsumerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group=EVENT_HUB_CONSUMER_GROUP,
checkpoint_store=checkpoint_store,
credential=credential,
)

# Initialize the last event time
last_event_time = datetime.now(timezone.utc)

async with client:
# client.receive method is a blocking call, so we run it in a separate thread.
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
starting_position="-1",
)
)

# Wait until no events are received for the specified duration
while True:
await asyncio.sleep(1)
if datetime.now(timezone.utc) - last_event_time > WAIT_DURATION:
break

# Close the client and the receive task
await client.close()
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass

# Close credential when no longer needed.
await credential.close()

def run():
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
задание зависит от Azure-eventhub-checkpointStoreblob-Aio (1.2.0) и Azure-inedity (1.21.0).


Подробнее здесь: https://stackoverflow.com/questions/796 ... essly-even
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»