Насколько я понимаю, я не могу просто передать данные, которые я получаю в форме «Entity», в ActivityFunctions из моего оркестратора, потому что это не базовый тип данных и не может быть (де)сериализован. Итак, я пытаюсь использовать статическую ConcurrentQueue в классе, который заполняю оркестратор и удаляю из очереди в действии. Поскольку размер очереди становится меньше, я предполагаю, что эта часть, вероятно, не лучшее решение, но работает. Я не знаю, приносят ли эти занятия что-нибудь полезное. Что я точно знаю, так это то, что после завершения вызовов параллельных действий функция зависнет в «await Task.WhenAll», ничего не делая видимым.
Пожалуйста, помогите мне понять все вещи моя попытка реализации пошла не так.
Код: Выделить всё
public class StatusSetter
{
private readonly IDataverseService _dataverseService;
private readonly IDataverseClientFactory _dataverseClientFactory;
public static Dictionary entities = new Dictionary();
public static Guid someGuid = Guid.Empty;
public static EntitySetting currentSetting;
public static int cnt = 0;
public static ConcurrentQueue entQueue = new ConcurrentQueue();
public StatusSetter(IDataverseService dataverseService, IDataverseClientFactory dataverseClientFactory)
{
_dataverseService = dataverseService;
_dataverseClientFactory = dataverseClientFactory;
}
Код: Выделить всё
[Function(nameof(StatusSetter))]
public async Task RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(StatusSetter));
var serviceClient = _dataverseService.GetClient();
currentSetting = new EntitySetting { logicalname = "contact", columns = new List { "statuscode", "createdon" } };
await context.CallActivityAsync(nameof(RetrieveEntity));
// some more of the above with changig settings
if (!context.IsReplaying)
foreach (var contact in entities["contact"])
{
entQueue.Enqueue(contact);
}
var parallelTasks = new List();
int count = entQueue.Count;
for (int i = 0; i < count; i++)
{
Task task = context.CallActivityAsync(nameof(HandleContactBatch), "");
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// never reached
}
Код: Выделить всё
[Function(nameof(RetrieveEntity))]
public bool RetrieveEntity([ActivityTrigger] FunctionContext executionContext)
{
var serviceClient = _dataverseService.GetClient();
EntitySetting setting = currentSetting;
entities.Add(setting.logicalname, new List());
EntityCollection collection;
QueryExpression query = new QueryExpression(setting.logicalname);
query.ColumnSet = new ColumnSet(setting.columns.ToArray());
query.PageInfo.Count = 5000;
query.PageInfo.PageNumber = 1;
query.PageInfo.PagingCookie = null;
do
{
collection = serviceClient.RetrieveMultiple(query);
entities[setting.logicalname].AddRange(collection.Entities);
query.PageInfo.PageNumber += 1;
query.PageInfo.PagingCookie = collection.PagingCookie;
}
while (collection.MoreRecords);
return true;
}
Код: Выделить всё
[Function(nameof(HandleContactBatch))]
public string HandleContactBatch([ActivityTrigger] string dummy, FunctionContext executionContext)
{
var serviceClient = _dataverseService.GetClient();
ILogger logger = executionContext.GetLogger(nameof(HandleContactBatch));
logger.LogInformation("Start handling " + cnt++ + " remaining " + entQueue.Count); // cnt increases, entQueue.count decreases
Entity contact;
bool success = entQueue.TryDequeue(out contact);
if (!success)
{
return "";
}
// do stuff with serviceClient, don't know if it actually happens, e.g. serviceClient.Update()
Код: Выделить всё
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureCustomFunctionsWorker()
.Build();
host.Run();
Подробнее здесь: https://stackoverflow.com/questions/789 ... nteraction