Как выполнить вывод в службу Blob с помощью функций Durable ⇐ Python
Как выполнить вывод в службу Blob с помощью функций Durable
Я новичок в функциях Azure и функциях Durable. Я хочу написать код, в котором функция активности, вызываемая функцией оркестратора, будет вычислять некоторое значение и подключаться к службе BLOB-объектов.
Как изменить приведенный ниже код, чтобы выводить значение, сгенерированное функцией активности, в службу BLOB-объектов?
Среда, которую я использую,
[*]Функции Azure, устойчивые функции [*]Код Visual Studio [*]Модель программирования Python 2
Ниже приведен упрощенный код, который я собираюсь использовать. Пожалуйста, измените это. Между прочим, клиентские функции и функции оркестрации могут иметь мало общего друг с другом
импортировать azure.functions как func импортировать azure.durable_functions как df журнал импорта приложение = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) ### клиентская функция ### @app.route(route="оркестраторы/client_function") @app.durable_client_input(client_name="client") асинхронная защита client_function (req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse: instance_id = await client.start_new("оркестратор", нет, {}) logging.info(f"Начата оркестровка с идентификатором = '{instance_id}'.") await client.wait_for_completion_or_create_check_status_response(req, instance_id) статус = ожидайте client.get_status(instance_id) return f"выход: {status.output}" ### функция оркестратора ### @app.orchestration_trigger(context_name="context") def оркестратор (контекст: df.DurableOrchestrationContext) -> dict: test = выход context.call_activity("main", "") вернуть {"Тест": тест} ### функция активности ### @app.blob_output(arg_name="outputblob", путь="newblob/test.txt", соединение="BlobStorageConnection") @app.activity_trigger(input_name="blank") def main(blank: str, outputblob: func.Out[str]): string = "Данные успешно вставлены" logging.info(f'Функция триггера очереди Python обработала {len(string)} байт') outputblob.set(строка) вернуть «Завершено» local_setting.json
{ «IsEncrypted»: ложь, "Ценности": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "питон", "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", "BlobStorageConnection": "DefaultEndpointsProtocol=https;AccountNa***" } } сообщение об ошибке введите сюда описание изображения
Я новичок в функциях Azure и функциях Durable. Я хочу написать код, в котором функция активности, вызываемая функцией оркестратора, будет вычислять некоторое значение и подключаться к службе BLOB-объектов.
Как изменить приведенный ниже код, чтобы выводить значение, сгенерированное функцией активности, в службу BLOB-объектов?
Среда, которую я использую,
[*]Функции Azure, устойчивые функции [*]Код Visual Studio [*]Модель программирования Python 2
Ниже приведен упрощенный код, который я собираюсь использовать. Пожалуйста, измените это. Между прочим, клиентские функции и функции оркестрации могут иметь мало общего друг с другом
импортировать azure.functions как func импортировать azure.durable_functions как df журнал импорта приложение = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) ### клиентская функция ### @app.route(route="оркестраторы/client_function") @app.durable_client_input(client_name="client") асинхронная защита client_function (req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse: instance_id = await client.start_new("оркестратор", нет, {}) logging.info(f"Начата оркестровка с идентификатором = '{instance_id}'.") await client.wait_for_completion_or_create_check_status_response(req, instance_id) статус = ожидайте client.get_status(instance_id) return f"выход: {status.output}" ### функция оркестратора ### @app.orchestration_trigger(context_name="context") def оркестратор (контекст: df.DurableOrchestrationContext) -> dict: test = выход context.call_activity("main", "") вернуть {"Тест": тест} ### функция активности ### @app.blob_output(arg_name="outputblob", путь="newblob/test.txt", соединение="BlobStorageConnection") @app.activity_trigger(input_name="blank") def main(blank: str, outputblob: func.Out[str]): string = "Данные успешно вставлены" logging.info(f'Функция триггера очереди Python обработала {len(string)} байт') outputblob.set(строка) вернуть «Завершено» local_setting.json
{ «IsEncrypted»: ложь, "Ценности": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "питон", "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", "BlobStorageConnection": "DefaultEndpointsProtocol=https;AccountNa***" } } сообщение об ошибке введите сюда описание изображения
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение