Использование ConcurrentQueue в устойчивой функции Azure с взаимодействием с DataverseC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Использование ConcurrentQueue в устойчивой функции Azure с взаимодействием с Dataverse

Сообщение Anonymous »

Прежде всего, я новичок в устойчивых функциях и сейчас не понимаю всех его концепций. В частности, что можно и чего нельзя делать при совместном использовании переменных между методами. Я создаю функцию, которая должна периодически запускаться, получать некоторые данные из Microsoft Dataverse, отправлять их через некоторую логику и обновлять строки сущностей. Я пытаюсь использовать устойчивую функцию для оптимизации затрат, а также потому, что объем обрабатываемых данных будет большим, и раньше у нас были проблемы с длительной работой «базовых» функций.
Насколько я понимаю, я не могу просто передать данные, которые я получаю в форме «Entity», в ActivityFunctions из моего оркестратора, потому что это не базовый тип данных и не может быть (де)сериализован. Итак, я пытаюсь использовать статическую ConcurrentQueue в классе, который заполняю оркестратор и удаляю из очереди в действии. Поскольку размер очереди становится меньше, я предполагаю, что эта часть, вероятно, не лучшее решение, но работает. Я не знаю, приносят ли эти занятия что-нибудь полезное. Что я точно знаю, так это то, что после завершения вызовов параллельных действий функция зависнет в «await Task.WhenAll», ничего не делая видимым.
Пожалуйста, помогите мне понять все вещи моя попытка реализации пошла не так.

Код: Выделить всё

    public class StatusSetter
{
private readonly IDataverseService _dataverseService;
private readonly IDataverseClientFactory _dataverseClientFactory;
public static Dictionary entities = new Dictionary();
public static Guid someGuid = Guid.Empty;
public static EntitySetting currentSetting;
public static int cnt = 0;
public static ConcurrentQueue entQueue = new ConcurrentQueue();

public StatusSetter(IDataverseService dataverseService, IDataverseClientFactory dataverseClientFactory)
{
_dataverseService = dataverseService;
_dataverseClientFactory = dataverseClientFactory;
}

Код: Выделить всё

       [Function(nameof(StatusSetter))]
public async Task RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(StatusSetter));

var serviceClient = _dataverseService.GetClient();

currentSetting = new EntitySetting { logicalname = "contact", columns = new List { "statuscode", "createdon" } };
await context.CallActivityAsync(nameof(RetrieveEntity));
// some more of the above with changig settings

if (!context.IsReplaying)
foreach (var contact in entities["contact"])
{
entQueue.Enqueue(contact);
}

var parallelTasks = new List();

int count = entQueue.Count;
for (int i = 0; i < count; i++)
{
Task task = context.CallActivityAsync(nameof(HandleContactBatch), "");
parallelTasks.Add(task);
}

await Task.WhenAll(parallelTasks);

// never reached
}

Код: Выделить всё

        [Function(nameof(RetrieveEntity))]
public bool RetrieveEntity([ActivityTrigger] FunctionContext executionContext)
{
var serviceClient = _dataverseService.GetClient();
EntitySetting setting = currentSetting;
entities.Add(setting.logicalname, new List());

EntityCollection collection;
QueryExpression query = new QueryExpression(setting.logicalname);
query.ColumnSet = new ColumnSet(setting.columns.ToArray());
query.PageInfo.Count = 5000;
query.PageInfo.PageNumber = 1;
query.PageInfo.PagingCookie = null;
do
{
collection = serviceClient.RetrieveMultiple(query);
entities[setting.logicalname].AddRange(collection.Entities);
query.PageInfo.PageNumber += 1;
query.PageInfo.PagingCookie = collection.PagingCookie;
}
while (collection.MoreRecords);

return true;
}

Код: Выделить всё

        [Function(nameof(HandleContactBatch))]
public string HandleContactBatch([ActivityTrigger] string dummy, FunctionContext executionContext)
{
var serviceClient = _dataverseService.GetClient();
ILogger logger = executionContext.GetLogger(nameof(HandleContactBatch));

logger.LogInformation("Start handling " + cnt++ + " remaining " + entQueue.Count); // cnt increases, entQueue.count decreases

Entity contact;
bool success = entQueue.TryDequeue(out contact);

if (!success)
{
return "";
}

// do stuff with serviceClient, don't know if it actually happens, e.g. serviceClient.Update()
Конструктор хостов:

Код: Выделить всё

var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureCustomFunctionsWorker()
.Build();

host.Run();
Если вам понадобится еще код, дайте мне знать!

Подробнее здесь: https://stackoverflow.com/questions/789 ... nteraction
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Использование ConcurrentQueue в устойчивой функции Azure с взаимодействием с Dataverse
    Anonymous » » в форуме C#
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Как получить текущий объект в изолированной модели устойчивой функции Azure
    Anonymous » » в форуме C#
    0 Ответы
    31 Просмотры
    Последнее сообщение Anonymous
  • Как получить строку подключения в устойчивой функции Azure
    Anonymous » » в форуме C#
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous
  • Как получить строку подключения в устойчивой функции Azure
    Anonymous » » в форуме C#
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Сериализация ввода в устойчивой функции с помощью Orchestrator
    Anonymous » » в форуме C#
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C#»