Мы пытаемся решить эту проблему в нашем коде, используя каналы, как показано ниже:
Я использую канал для связи производителя и потребителя. А затем параллельный словарь для отслеживания потребительских задач, чтобы он не выходил за пределы указанного числа (100 в приведенном ниже случае). Я пробовал искать другие структуры данных, но ни одна из них не дает такого поведения, как просмотр и блокировка, поэтому сообщение выбирается одним потребителем, обрабатывается, а затем удаляется позже. В течение этого времени сообщение не должно быть доступно другим потребителям.
Хотел узнать, существует ли лучшая структура данных для решения этой проблемы, чем та, которую я использую сейчас.
Несколько вопросов и комментариев по коду:
- Функция ProduceAsync должна удалять из очереди больше сообщений и записывать их в канал, если в настоящее время одновременно обрабатывается менее 100 сообщений (настраиваемых для каждого работника). В противном случае ему следует просто дождаться, пока в канале освободится место.
- Метод ConsumeAsync в настоящее время считывает одно сообщение из канала и вызывает метод ProcessMessageAsync в Принцип «выстрелил и забыл», приводящий к одновременной обработке всех сообщений. Чтобы удалить этот принцип «выстрелил и забыл», мне нужно было бы использовать другой список задач, чтобы выполнить await Task.WhenAll, что будет дополнительным обслуживанием списка. Я открыт для предложений по дальнейшему улучшению, если я смогу это сделать каким-то образом, используя текущий ConucurrentDictionary, который уже хранит задачи.
{
static readonly Random random = new Random();
private ConcurrentDictionary messageProcessingTasksMap =
new ConcurrentDictionary();
static async Task Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
};
var channel = Channel.CreateBounded(new BoundedChannelOptions(100)
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});
var program = new Program();
var statsTask = program.TaskStatsAsync(channel.Reader, cts.Token);
var producerTask = program.ProduceAsyc(channel.Writer, cts.Token);
var consumerTask = program.ConsumeAsync(channel.Reader, cts.Token);
await Task.WhenAll(statsTask, producerTask, consumerTask);
Console.WriteLine("Press any key to exit...");
Console.Read();
}
private async Task ProduceAsyc(ChannelWriter channelWriter,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
// 100 will come from config and will be different for
// different workers
if (messageProcessingTasksMap.Count < 100)
{
int messagesToDequeue = 100 - messageProcessingTasksMap.Count;
if (messagesToDequeue > 10)
{
messagesToDequeue = 10;
}
for (int i = 0; i < messagesToDequeue; i++)
{
// In actual scenario, this will dequeue messages
// from cloud queue and write to channel
await channelWriter.WriteAsync($"Message-{i}");
}
}
}
Console.WriteLine("Stopping producer. Marking channel complete");
channelWriter.Complete();
Console.WriteLine("Producer stopped...");
}
private async Task ConsumeAsync(ChannelReader channelReader,
CancellationToken cancellationToken)
{
while (await channelReader.WaitToReadAsync())
{
while (messageProcessingTasksMap.Count < 100 &&
channelReader.TryRead(out var message))
{
var taskId = Guid.NewGuid().ToString();
messageProcessingTasksMap.TryAdd(taskId,
ProcessMessageAsync(message, taskId));
}
}
Console.WriteLine("Stopping consumer...");
Console.WriteLine($"Channel Count: {channelReader.Count}");
}
private async Task ProcessMessageAsync(string message, string taskId)
{
// Add delay to simulate some work
await Task.Delay(random.Next(100, 300));
messageProcessingTasksMap.Remove(taskId, out var _);
}
private async Task TaskStatsAsync(ChannelReader channelReader,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Console.WriteLine(
$"Number of tasks being processed: {messageProcessingTasksMap.Count}" +
$", Channel Count: {channelReader.Count}");
await Task.Delay(1000);
}
Console.WriteLine("Exiting Stats Task");
}
}
Подробнее здесь: https://stackoverflow.com/questions/785 ... nd-lock-be
Мобильная версия