Хотя файлов около 2 млн, большинство из них читаются только для того, чтобы убедиться, что их содержимое не нужно изменять. За один запуск процессора на самом деле может быть записано несколько сотен файлов.
Моя текущая реализация НЕ использует асинхронный ввод-вывод, хотя мне бы этого хотелось. Существует сложность, которая не позволяет мне найти хорошее решение.
Действительно, мой код является потребительской частью конвейера производитель/потребитель, где некоторый код создает содержимое файлов метаданных ( обрабатывая исходные файлы, но часть производителя не является предметом этого вопроса), помещает его в блокирующую коллекцию (я называю ее шиной), и мой код потребляет данные из шины одновременно из нескольких потоков .
Но что более важно, он состоит из двух отдельных этапов:
- Фаза 1 – Пока выполняется фоновая задача, собирающая имена всех существующих файлов метаданных. На этом этапе:
- Код использует File.Exists для проверки существования файла метаданных (все файлы являются локальными).
- Имена посещенных файлов метаданных записываются в специальную коллекцию с именемprocessedFiles.
- Этап 2. После завершения вышеупомянутой фоновой задачи имена всех существующих файлов метаданных собираются в коллекциюexistFiles. На этом этапе:
- Код проверяет, существует ли файл метаданных, выполняя поиск в коллекции suchFiles.
- Имена посещенных файлов метаданных удаляются из коллекции существующих файлов.
Мне удалось относительно легко запрограммировать это с помощью Parallel.ForEach , но теперь я хочу использовать асинхронный ввод-вывод и задаюсь вопросом, какие у меня есть варианты.
У меня есть большое ограничение — я должен использовать .NET Framework. Я сожалею об этом, но это то, что есть.
Я хотел бы показать суть моего кода, главным образом, чтобы показать, как я реализую переход между фазами.
Код: Выделить всё
private class EndWorkItemsParallelState
{
public readonly StringBuilder StringBuilder = new(1000);
public readonly Guid Guid = Guid.NewGuid();
}
private Task GetEndWorkItemsConsumer(BlockingCollection endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
ConcurrentDictionary existingFiles = null;
ConcurrentDictionary processedFiles = new(C.IgnoreCase);
ConcurrentDictionary locks = [];
object modeSwitchGuard = new();
Func saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);
Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
{
MaxDegreeOfParallelism = concurrency
}, () =>
{
EndWorkItemsParallelState state = new();
if (existingFiles == null)
{
locks[state.Guid] = new ManualResetEventSlim();
}
return state;
}, (endWorkItem, loop, state) =>
{
ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
return state;
}, state =>
{
if (locks.TryGetValue(state.Guid, out var @lock))
{
@lock.Set();
}
});
locks.Values.ForEach(o => o.Dispose());
DeleteStaleFiles(existingFiles.Keys);
void ProcessEndWorkItem(EndWorkItem endWorkItem, Func saveJsonFileIfDirty){ ... }
static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary processedFiles){ ... }
static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary existingFiles){ ... }
}, TaskCreationOptions.LongRunning);
Код: Выделить всё
if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}
existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;
saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
С момента публикации этого вопроса я нашел решение, но Интересно, можно ли его улучшить.
EDIT 1
Каждый рабочий элемент на шине содержит путь к файлу и новый контент. Обработка проверяет, существует ли файл И действительно ли новый контент отличается (путем чтения существующего файла и сравнения). Если содержимое отличается ИЛИ файл новый - записывается новый контент И создается другой файл в другом репозитории.
В конце мне нужно удалить все устаревшие файлы, т.е. те, которые никогда не посещались. Именно поэтому я собираю все существующие файлы.
Подробнее здесь: https://stackoverflow.com/questions/789 ... k-and-asyn