Объединение нескольких потоков IAsyncEnumerableC#

Место общения программистов C#
Ответить
Anonymous
 Объединение нескольких потоков IAsyncEnumerable

Сообщение Anonymous »

С выпуском Mediatr 10 появилась парадигма, позволяющая разработчикам создавать потоки на базе IAsyncEnumerable. Я использую эту парадигму для создания нескольких различных наблюдателей файловой системы для мониторинга нескольких папок. Для мониторинга папок я использую два разных подхода: опрос и FileSystemWatcher. В рамках моего конвейера все мониторы различных папок объединяются в один IEnumerable !_seenFileStore.Contains(f));

await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
{
var info = new FileRecord(f);

_seenFileStore.Add(f);
await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
queue.Enqueue(info);
});

// TODO: Try mixing the above parallel task with the serving task... Might be chaos...

while (!queue.IsEmpty)
{
if (queue.TryDequeue(out var result))
yield return result;
}

_logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);

await Task.Delay(request.Interval, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}
}
}

И FileSystemWatcher
public class FileSystemStreamHandler :
IStreamRequestHandler
{
private readonly ISeenFileStore _seenFileStore;
private readonly ILogger _logger;
private readonly IPublisher _publisher;
private readonly ConcurrentQueue _queue;

private Action? _tearDown;

public FileSystemStreamHandler(
ISeenFileStore seenFileStore,
ILogger logger,
IPublisher publisher)
{
_seenFileStore = seenFileStore;
_logger = logger;
_publisher = publisher;
_queue = new ConcurrentQueue();
}

public async IAsyncEnumerable Handle(
FileSystemStream request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var watcher = SetupWatcher(request.Folder, cancellationToken);

while (!cancellationToken.IsCancellationRequested)
{
if (_queue.TryDequeue(out var record))
yield return record;

await Task.Delay(100, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}

TearDownWatcher(watcher);
}

private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
{
var watcher = new FileSystemWatcher(folder);
watcher.NotifyFilter = NotifyFilters.Attributes
| NotifyFilters.CreationTime
| NotifyFilters.DirectoryName
| NotifyFilters.FileName
| NotifyFilters.LastAccess
| NotifyFilters.LastWrite
| NotifyFilters.Security
| NotifyFilters.Size;
watcher.EnableRaisingEvents = true;
_tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
watcher.Created += _tearDown.Invoke;

return watcher;
}

private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
{
var path = args.FullPath;

if (_seenFileStore.Contains(path)) return;

_seenFileStore.Add(path);

try
{
if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
}
catch (FileNotFoundException)
{
_logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
return;
}

var record = new FileRecord(path);
_queue.Enqueue(record);
await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
}

private void TearDownWatcher(FileSystemWatcher watcher)
{
if (_tearDown != null)
watcher.Created -= _tearDown.Invoke;
}
}


Наконец, вот класс, который связывает все воедино и пытается отслеживать потоки (в методе StartAsync). Вы заметите присутствие оператора Merge из System.Interactive.Async, но в настоящее время он не работает должным образом.
public class StreamedFolderWatcher : IDisposable
{
private readonly ConcurrentBag _streams;
private CancellationTokenSource? _cancellationTokenSource;
private readonly IMediator _mediator;
private readonly ILogger _logger;

public StreamedFolderWatcher(
IMediator mediator,
IEnumerable fileStreams,
ILogger logger)
{
_mediator = mediator;
_logger = logger;
_streams = new ConcurrentBag();
_cancellationTokenSource = new CancellationTokenSource();

fileStreams.ToList()
.ForEach(f => AddStream(f, _cancellationTokenSource.Token));
}

private void AddStream(
T request,
CancellationToken cancellationToken)
where T : IStreamRequest
{
_streams.Add(() => _mediator.CreateStream(request, cancellationToken));
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);

var streams = _streams.Select(s => s()).ToList();
while (!cancellationToken.IsCancellationRequested)
{
await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
{
_logger.LogInformation("Incoming file {File}", file);
}

await Task.Delay(1000, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}
}

public Task StopAsync()
{
_cancellationTokenSource?.Cancel();

return Task.CompletedTask;
}

public void Dispose()
{
_cancellationTokenSource?.Dispose();
GC.SuppressFinalize(this);
}
}

Я ожидаю от поведения Merge, что если у меня есть 3 IAsyncEnumerables, каждый элемент должен быть создан сразу после его получения. Вместо этого, если я не помещу разрыв прерывания где-нибудь внутри циклов, первый извлеченный IStreamRequestHandler будет просто выполняться до бесконечности, пока токен отмены не приведет к остановке.
Как я могу объединить несколько входных IAsyncEnumerables в один долгоживущий выходной поток, который генерируется каждый раз при получении результата?
Минимальная воспроизводимая выборка
static async IAsyncEnumerable CreateSequence(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var random = new Random();
var id = Guid.NewGuid();
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
yield return (id, random.Next(0, 10));
}
}

var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
.Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();

await foreach (var (id, value) in merged)
{
Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}


Подробнее здесь: https://stackoverflow.com/questions/706 ... le-streams
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»