Как эффективно обрабатывать миллионы локальных файлов с помощью .NET Framework и асинхронного ввода-вывода?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как эффективно обрабатывать миллионы локальных файлов с помощью .NET Framework и асинхронного ввода-вывода?

Сообщение Anonymous »

Мне нужно обработать один большой репозиторий (репозиторий исходного кода) и создать/обновить (или оставить нетронутыми) около 2 миллионов файлов в другом репозитории (репозиторий метаданных).
Хотя файлов около 2 млн, большинство из них читаются только для того, чтобы убедиться, что их содержимое не нужно изменять. За один запуск процессора на самом деле может быть записано несколько сотен файлов.
Моя текущая реализация НЕ использует асинхронный ввод-вывод, хотя мне бы этого хотелось. Существует сложность, которая не позволяет мне найти хорошее решение.
Действительно, мой код является потребительской частью конвейера производитель/потребитель, где некоторый код создает содержимое файлов метаданных ( обрабатывая исходные файлы, но часть производителя не является предметом этого вопроса), помещает его в блокирующую коллекцию (я называю ее шиной), и мой код потребляет данные из шины одновременно из нескольких потоков .
Но что более важно, он состоит из двух отдельных этапов:
  • Фаза 1 – Пока выполняется фоновая задача, собирающая имена всех существующих файлов метаданных. На этом этапе:
    • Код использует File.Exists для проверки существования файла метаданных (все файлы являются локальными).
    • Имена посещенных файлов метаданных записываются в специальную коллекцию с именемprocessedFiles.
  • Этап 2. После завершения вышеупомянутой фоновой задачи имена всех существующих файлов метаданных собираются в коллекциюexistFiles. На этом этапе:
    • Код проверяет, существует ли файл метаданных, выполняя поиск в коллекции suchFiles.
    • Имена посещенных файлов метаданных удаляются из коллекции существующих файлов.
Задача заключается в реализации перехода между двумя фазами без потери каких-либо «пакетов» на шине.
Мне удалось относительно легко запрограммировать это с помощью Parallel.ForEach , но теперь я хочу использовать асинхронный ввод-вывод и задаюсь вопросом, какие у меня есть варианты.
У меня есть большое ограничение — я должен использовать .NET Framework. Я сожалею об этом, но это то, что есть.
Я хотел бы показать суть моего кода, главным образом, чтобы показать, как я реализую переход между фазами.

Код: Выделить всё

private class EndWorkItemsParallelState
{
public readonly StringBuilder StringBuilder = new(1000);
public readonly Guid Guid = Guid.NewGuid();
}

private Task GetEndWorkItemsConsumer(BlockingCollection endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
ConcurrentDictionary existingFiles = null;
ConcurrentDictionary processedFiles = new(C.IgnoreCase);

ConcurrentDictionary locks = [];
object modeSwitchGuard = new();

Func saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);

Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
{
MaxDegreeOfParallelism = concurrency
}, () =>
{
EndWorkItemsParallelState state = new();
if (existingFiles == null)
{
locks[state.Guid] = new ManualResetEventSlim();
}
return state;
}, (endWorkItem, loop, state) =>
{
ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}

existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;

saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}

return state;
}, state =>
{
if (locks.TryGetValue(state.Guid, out var @lock))
{
@lock.Set();
}
});

locks.Values.ForEach(o => o.Dispose());

DeleteStaleFiles(existingFiles.Keys);

void ProcessEndWorkItem(EndWorkItem endWorkItem, Func saveJsonFileIfDirty){ ... }
static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary processedFiles){ ... }
static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary existingFiles){ ... }
}, TaskCreationOptions.LongRunning);
Переход реализован здесь:

Код: Выделить всё

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}

existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;

saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
Когда поток обнаруживает, что фаза 1 должна быть завершена, он пытается войти в критическую секцию (это удается только одному), а затем ждет, пока ВСЕ остальные потоки попадут в критическую секцию и заблокируются. это. Он знает, когда другие потоки достигают его, поскольку каждый поток имеет свой собственный экземпляр ManualResetEventSlim, который они сигнализируют, когда попадают в критическую секцию. Поток, находящийся внутри него, ожидает всех этих сигналов. Как только все установлено, это означает, что все потоки заблокированы в критическом разделе и можно безопасно выполнять переход между фазами.
С момента публикации этого вопроса я нашел решение, но Интересно, можно ли его улучшить.
EDIT 1
Каждый рабочий элемент на шине содержит путь к файлу и новый контент. Обработка проверяет, существует ли файл И действительно ли новый контент отличается (путем чтения существующего файла и сравнения). Если содержимое отличается ИЛИ файл новый - записывается новый контент И создается другой файл в другом репозитории.
В конце мне нужно удалить все устаревшие файлы, т.е. те, которые никогда не посещались. Именно поэтому я собираю все существующие файлы.

Подробнее здесь: https://stackoverflow.com/questions/789 ... k-and-asyn
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»