Высокопроизводительное объединение шаблона производитель/потребитель ⇐ C#
Высокопроизводительное объединение шаблона производитель/потребитель
Предположим, у вас есть следующий интерфейс:
интерфейс IConflateWorkByKey { IAsyncEnumerable GetValues(); void Publish (ключ TKey, значение TValue); } Этот шаблон предназначен для описания шаблона, подобного очереди работ. Производители могут ставить элементы в очередь с помощью Publish, а один потребитель будет удалять элементы с помощью GetValues, при этом:
[*]Значения должны быть объединены по ключу. Это означает, что если производители вызывают Publish(1, "hello"), а затем Publish(1, "world") до того, как у потребителя появится возможность опустошить следующий пакет (словарь ), то следующий словарь, который получит потребитель, должен иметь ключ: 1, значение: "world". Предыдущее обновление (1, «привет») будет удалено/никогда не будет видно потребителю. [*]Publish обычно вызывается гораздо быстрее, чем работа может быть истощена. Другими словами, вызовы Publish должны быть максимально быстрыми, при этом очистка элементов не так критична к производительности. [*]Вполне вероятно, что в большинстве практических случаев, когда итератор, возвращаемый из GetValues, перемещается потребителем работы, новые элементы уже будут доступны и не будут фактическими. требуется ожидание; для этого случая было бы полезно иметь быструю оптимизацию. Однако необходимо подготовить реализацию, чтобы этого не произошло, а затем асинхронно ждать доступности новых элементов. [*]Будет только один потребитель (т. е.: GetValues будет вызываться/использоваться только 1 потребителем) [*]Publish не будет вызываться одновременно (хотя может вызываться последовательно из разных потоков)
Моя текущая реализация выглядит следующим образом:
class Conflate: IConflateWorkByKey { частный словарь? _buffered = ноль; частный объект только для чтения _lock = new(); public IAsyncEnumerable GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { замок (_lock) { в то время как (_buffered имеет значение null) Monitor.Wait(_lock); результат вар = _buffered; _buffered = ноль; выходной возврат результата; } } } public void Publish (ключ TKey, значение TValue) { замок (_lock) { _buffered ??= новый(); _buffered[ключ] = значение; Monitor.Pulse(_lock); } } } Обратите внимание, что я готов изменить метод Publish, чтобы он возвращал ValueTask, если это будет оптимально для конкретной реализации.
В принципе, это работает нормально, но основная проблема заключается в том, что реализация GetValues здесь не является асинхронной; вызывающий поток правильно блокируется в Monitor.Wait.
Я также пробовал использовать этот шаблон с AsyncMonitor из Nito.AsyncEx, но, к сожалению, AsyncMonitor.Pulse значительно< /em> слишком медленно.
Может ли кто-нибудь придумать более умную реализацию/шаблон, который бы очень быстро с точки зрения публикации значений и в то же время позволял бы действительно асинхронное ожидание/сигнализацию изнутри GetValues?
Изменить: вот еще одна идея. Мне еще предстоит подумать, правильно ли это, и измерить производительность, но перечислю это здесь для обсуждения. Конечно, по-прежнему интересны другие идеи!
class Conflate: IConflateWorkByKey { частный словарь? _buffered = новый (); частный объект только для чтения _lock = new(); частный TaskCompletionSource? _tcs = ноль; public IAsyncEnumerable GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { Результат словаря; в то время как (истина) { замок (_lock) { если (_buffered.Any()) { // «Быстрый путь» — следующий результат уже доступен, публикуйте напрямую, не дожидаясь результат = _buffered; _buffered = новый (); перерыв; } _tcs = новый(); } дождитесь _tcs.Task; } выходной возврат результата; } } public void Publish (ключ TKey, значение TValue) { замок (_lock) { _buffered[ключ] = значение; если (_tcs не равно нулю) { _tcs.TrySetResult(); _tcs = ноль; // «Быстрый путь», следующий вызов публикации даже не требует вызова TrySetResult(), если между ними не были удалены значения } } } }
Предположим, у вас есть следующий интерфейс:
интерфейс IConflateWorkByKey { IAsyncEnumerable GetValues(); void Publish (ключ TKey, значение TValue); } Этот шаблон предназначен для описания шаблона, подобного очереди работ. Производители могут ставить элементы в очередь с помощью Publish, а один потребитель будет удалять элементы с помощью GetValues, при этом:
[*]Значения должны быть объединены по ключу. Это означает, что если производители вызывают Publish(1, "hello"), а затем Publish(1, "world") до того, как у потребителя появится возможность опустошить следующий пакет (словарь ), то следующий словарь, который получит потребитель, должен иметь ключ: 1, значение: "world". Предыдущее обновление (1, «привет») будет удалено/никогда не будет видно потребителю. [*]Publish обычно вызывается гораздо быстрее, чем работа может быть истощена. Другими словами, вызовы Publish должны быть максимально быстрыми, при этом очистка элементов не так критична к производительности. [*]Вполне вероятно, что в большинстве практических случаев, когда итератор, возвращаемый из GetValues, перемещается потребителем работы, новые элементы уже будут доступны и не будут фактическими. требуется ожидание; для этого случая было бы полезно иметь быструю оптимизацию. Однако необходимо подготовить реализацию, чтобы этого не произошло, а затем асинхронно ждать доступности новых элементов. [*]Будет только один потребитель (т. е.: GetValues будет вызываться/использоваться только 1 потребителем) [*]Publish не будет вызываться одновременно (хотя может вызываться последовательно из разных потоков)
Моя текущая реализация выглядит следующим образом:
class Conflate: IConflateWorkByKey { частный словарь? _buffered = ноль; частный объект только для чтения _lock = new(); public IAsyncEnumerable GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { замок (_lock) { в то время как (_buffered имеет значение null) Monitor.Wait(_lock); результат вар = _buffered; _buffered = ноль; выходной возврат результата; } } } public void Publish (ключ TKey, значение TValue) { замок (_lock) { _buffered ??= новый(); _buffered[ключ] = значение; Monitor.Pulse(_lock); } } } Обратите внимание, что я готов изменить метод Publish, чтобы он возвращал ValueTask, если это будет оптимально для конкретной реализации.
В принципе, это работает нормально, но основная проблема заключается в том, что реализация GetValues здесь не является асинхронной; вызывающий поток правильно блокируется в Monitor.Wait.
Я также пробовал использовать этот шаблон с AsyncMonitor из Nito.AsyncEx, но, к сожалению, AsyncMonitor.Pulse значительно< /em> слишком медленно.
Может ли кто-нибудь придумать более умную реализацию/шаблон, который бы очень быстро с точки зрения публикации значений и в то же время позволял бы действительно асинхронное ожидание/сигнализацию изнутри GetValues?
Изменить: вот еще одна идея. Мне еще предстоит подумать, правильно ли это, и измерить производительность, но перечислю это здесь для обсуждения. Конечно, по-прежнему интересны другие идеи!
class Conflate: IConflateWorkByKey { частный словарь? _buffered = новый (); частный объект только для чтения _lock = new(); частный TaskCompletionSource? _tcs = ноль; public IAsyncEnumerable GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { Результат словаря; в то время как (истина) { замок (_lock) { если (_buffered.Any()) { // «Быстрый путь» — следующий результат уже доступен, публикуйте напрямую, не дожидаясь результат = _buffered; _buffered = новый (); перерыв; } _tcs = новый(); } дождитесь _tcs.Task; } выходной возврат результата; } } public void Publish (ключ TKey, значение TValue) { замок (_lock) { _buffered[ключ] = значение; если (_tcs не равно нулю) { _tcs.TrySetResult(); _tcs = ноль; // «Быстрый путь», следующий вызов публикации даже не требует вызова TrySetResult(), если между ними не были удалены значения } } } }
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение