Как перенести Parallel.ForEach в .NET Framework на асинхронный ввод-вывод?C#

Место общения программистов C#
Ответить
Anonymous
 Как перенести Parallel.ForEach в .NET Framework на асинхронный ввод-вывод?

Сообщение Anonymous »

Мне нужно обработать один большой репозиторий (репозиторий исходного кода) и создать/обновить (или оставить нетронутыми) около 2 миллионов файлов в другом репозитории (репозиторий метаданных).
Моя текущая реализация НЕ использует асинхронный ввод-вывод, хотя мне бы этого хотелось. Существует сложность, которая не позволяет мне найти хорошее решение.
Действительно, мой код является потребительской частью конвейера производитель/потребитель, где некоторый код создает содержимое файлов метаданных, помещает его в коллекцию блоков (я называю ее шиной), и мой код получает данные из шины одновременно из нескольких потоков.
Но что более важно, он содержит 2 отдельные фазы:
  • Фаза 1 — во время выполнения фоновой задачи, которая собирает имена всех существующих файлов метаданных. На этом этапе:
    • Код использует File.Exists для проверки существования файла метаданных.
    • Имена посещенных файлов метаданных записываются в специальную коллекцию под названием ProcessedFiles.
  • Этап 2 — после завершения вышеупомянутой фоновой задачи имена всех существующих файлов метаданных собираются в коллекцию suchFiles. На этом этапе:
    • Код проверяет, существует ли файл метаданных, выполняя поиск в коллекции suchFiles.
    • Имена посещенных файлов метаданных удаляются из коллекции существующих файлов.
Задача заключается в реализации перехода между двумя фазами без потери каких-либо «пакетов» на шине.
Мне удалось относительно легко запрограммировать это с помощью Parallel.ForEach , но теперь я хочу использовать асинхронный ввод-вывод, и я застрял, поскольку в .NET Framework нет Parallel.ForEachAsync, и я должен использовать .NET Framework. Мне нужен другой подход.
Я хотел бы показать суть моего кода, главным образом, чтобы показать, как я реализую переход между фазами.

Код: Выделить всё

private class EndWorkItemsParallelState
{
public readonly StringBuilder StringBuilder = new(1000);
public readonly Guid Guid = Guid.NewGuid();
}

private Task GetEndWorkItemsConsumer(BlockingCollection endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
ConcurrentDictionary existingFiles = null;
ConcurrentDictionary processedFiles = new(C.IgnoreCase);

ConcurrentDictionary locks = [];
object modeSwitchGuard = new();

Func saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);

Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
{
MaxDegreeOfParallelism = concurrency
}, () =>
{
EndWorkItemsParallelState state = new();
if (existingFiles == null)
{
locks[state.Guid] = new ManualResetEventSlim();
}
return state;
}, (endWorkItem, loop, state) =>
{
ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}

existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;

saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}

return state;
}, state =>
{
if (locks.TryGetValue(state.Guid, out var @lock))
{
@lock.Set();
}
});

locks.Values.ForEach(o => o.Dispose());

DeleteStaleFiles(existingFiles.Keys);

void ProcessEndWorkItem(EndWorkItem endWorkItem, Func saveJsonFileIfDirty){ ... }
static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary processedFiles){ ...  }
static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary existingFiles){ ... }
}, TaskCreationOptions.LongRunning);
Переход реализован здесь:

Код: Выделить всё

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
locks[state.Guid].Set();
lock (modeSwitchGuard)
{
if (existingFiles == null)
{
foreach (var @lock in locks.Values)
{
@lock.Wait();
}

existingFiles = m_collectExistingFilesTask.Result;
foreach (var processedFile in processedFiles.Keys)
{
existingFiles.TryRemove(processedFile, out _);
}
processedFiles = null;

saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
}
}
}
Когда поток обнаруживает, что фаза 1 должна быть завершена, он пытается войти в критическую секцию (это удается только одному), а затем ждет, пока ВСЕ остальные потоки попадут в критическую секцию и заблокируются. это. Он знает, когда другие потоки достигают его, поскольку каждый поток имеет свой собственный экземпляр ManualResetEventSlim, который они сигнализируют, когда попадают в критическую секцию. Поток, находящийся внутри него, ожидает всех этих сигналов. Как только все установлено, это означает, что все потоки заблокированы в критической секции и можно безопасно выполнять переход между фазами.
Пока мне не удалось перенести эту логику на асинхронный формат. /О. Возможно, потому, что моя логика перехода слишком сложна.
Любые предложения приветствуются.

Подробнее здесь: https://stackoverflow.com/questions/789 ... o-async-io
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»