Моя текущая реализация НЕ использует асинхронный ввод-вывод, хотя мне бы этого хотелось. Существует сложность, которая не позволяет мне найти хорошее решение.
Действительно, мой код является потребительской частью конвейера производитель/потребитель, где некоторый код создает содержимое файлов метаданных, помещает его в коллекцию блоков (я называю ее шиной), и мой код получает данные из шины одновременно из нескольких потоков.
Но что более важно, он содержит 2 отдельные фазы:
- Фаза 1 — во время выполнения фоновой задачи, которая собирает имена всех существующих файлов метаданных. На этом этапе:
- Код использует File.Exists для проверки существования файла метаданных.
- Имена посещенных файлов метаданных записываются в специальную коллекцию под названием ProcessedFiles.
- Этап 2 — после завершения вышеупомянутой фоновой задачи имена всех существующих файлов метаданных собираются в коллекцию suchFiles. На этом этапе:
- Код проверяет, существует ли файл метаданных, выполняя поиск в коллекции suchFiles.
- Имена посещенных файлов метаданных удаляются из коллекции существующих файлов.
Мне удалось относительно легко запрограммировать это с помощью Parallel.ForEach , но теперь я хочу использовать асинхронный ввод-вывод, и я застрял, поскольку в .NET Framework нет Parallel.ForEachAsync, и я должен использовать .NET Framework. Мне нужен другой подход.
Я хотел бы показать суть моего кода, главным образом, чтобы показать, как я реализую переход между фазами.
Код: Выделить всё
private class EndWorkItemsParallelState
{
public readonly StringBuilder StringBuilder = new(1000);
public readonly Guid Guid = Guid.NewGuid();
}
private Task GetEndWorkItemsConsumer(BlockingCollection endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
ConcurrentDictionary existingFiles = null;
ConcurrentDictionary processedFiles = new(C.IgnoreCase);
ConcurrentDictionary locks = [];
object modeSwitchGuard = new();
Func saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);
Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
{
MaxDegreeOfParallelism = concurrency
}, () =>
{
EndWorkItemsParallelState state = new();
if (existingFiles == null)
{
locks[state.Guid] = new ManualResetEventSlim();
}
return state;
}, (endWorkItem, loop, state) =>
{
ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
return state;
}, state =>
{
if (locks.TryGetValue(state.Guid, out var @lock))
{
@lock.Set();
}
});
locks.Values.ForEach(o => o.Dispose());
DeleteStaleFiles(existingFiles.Keys);
void ProcessEndWorkItem(EndWorkItem endWorkItem, Func saveJsonFileIfDirty){ ... }
static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary processedFiles){ ... }
static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary existingFiles){ ... }
}, TaskCreationOptions.LongRunning);
Код: Выделить всё
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
Пока мне не удалось перенести эту логику на асинхронный формат. /О. Возможно, потому, что моя логика перехода слишком сложна.
Любые предложения приветствуются.
Подробнее здесь: https://stackoverflow.com/questions/789 ... o-async-io
Мобильная версия