Структура или коллекция данных в памяти на C#, которая демонстрирует поведение просмотра и блокировки, такое как AWS SQSC#

Место общения программистов C#
Ответить
Anonymous
 Структура или коллекция данных в памяти на C#, которая демонстрирует поведение просмотра и блокировки, такое как AWS SQS

Сообщение Anonymous »

Мне необходимо исключить сообщения из облачной очереди и затем обработать их. Однако для каждого экземпляра работника мы хотим позволить ему определить, сколько сообщений он может комфортно обрабатывать одновременно. Например, может быть один работник, который может одновременно обрабатывать до 100 сообщений из очереди, а другой может одновременно обрабатывать только 10 сообщений. Поэтому мы хотим, чтобы экземпляр работника всегда обрабатывал такое количество сообщений при условии, что в очереди есть сообщения, чтобы полностью использовать возможности работника.
Мы пытаемся решить эту проблему в нашем коде, используя каналы, как показано ниже:
Я использую канал для связи производителя и потребителя. А затем параллельный словарь для отслеживания потребительских задач, чтобы он не выходил за пределы указанного числа (100 в приведенном ниже случае). Я пробовал искать другие структуры данных, но ни одна из них не дает такого поведения, как просмотр и блокировка, поэтому сообщение выбирается одним потребителем, обрабатывается, а затем удаляется позже. В течение этого времени сообщение не должно быть доступно другим потребителям.
Хотел узнать, существует ли лучшая структура данных для решения этой проблемы, чем та, которую я использую сейчас.
Несколько вопросов и комментариев по коду:
  • Функция ProduceAsync должна удалять из очереди больше сообщений и записывать их в канал, если в настоящее время одновременно обрабатывается менее 100 сообщений (настраиваемых для каждого работника). В противном случае ему следует просто дождаться, пока в канале освободится место.
  • Метод ConsumeAsync в настоящее время считывает одно сообщение из канала и вызывает метод ProcessMessageAsync в Принцип «выстрелил и забыл», приводящий к одновременной обработке всех сообщений. Чтобы удалить этот принцип «выстрелил и забыл», мне нужно было бы использовать другой список задач, чтобы выполнить await Task.WhenAll, что будет дополнительным обслуживанием списка. Я открыт для предложений по дальнейшему улучшению, если я смогу это сделать каким-то образом, используя текущий ConucurrentDictionary, который уже хранит задачи.
internal class Program
{
static readonly Random random = new Random();
private ConcurrentDictionary messageProcessingTasksMap =
new ConcurrentDictionary();

static async Task Main(string[] args)
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
};

var channel = Channel.CreateBounded(new BoundedChannelOptions(100)
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});

var program = new Program();
var statsTask = program.TaskStatsAsync(channel.Reader, cts.Token);
var producerTask = program.ProduceAsyc(channel.Writer, cts.Token);
var consumerTask = program.ConsumeAsync(channel.Reader, cts.Token);

await Task.WhenAll(statsTask, producerTask, consumerTask);
Console.WriteLine("Press any key to exit...");
Console.Read();
}

private async Task ProduceAsyc(ChannelWriter channelWriter,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
// 100 will come from config and will be different for
// different workers
if (messageProcessingTasksMap.Count < 100)
{
int messagesToDequeue = 100 - messageProcessingTasksMap.Count;
if (messagesToDequeue > 10)
{
messagesToDequeue = 10;
}

for (int i = 0; i < messagesToDequeue; i++)
{
// In actual scenario, this will dequeue messages
// from cloud queue and write to channel
await channelWriter.WriteAsync($"Message-{i}");
}
}
}

Console.WriteLine("Stopping producer. Marking channel complete");
channelWriter.Complete();
Console.WriteLine("Producer stopped...");
}

private async Task ConsumeAsync(ChannelReader channelReader,
CancellationToken cancellationToken)
{
while (await channelReader.WaitToReadAsync())
{
while (messageProcessingTasksMap.Count < 100 &&
channelReader.TryRead(out var message))
{
var taskId = Guid.NewGuid().ToString();
messageProcessingTasksMap.TryAdd(taskId,
ProcessMessageAsync(message, taskId));
}
}

Console.WriteLine("Stopping consumer...");
Console.WriteLine($"Channel Count: {channelReader.Count}");
}

private async Task ProcessMessageAsync(string message, string taskId)
{
// Add delay to simulate some work
await Task.Delay(random.Next(100, 300));
messageProcessingTasksMap.Remove(taskId, out var _);
}

private async Task TaskStatsAsync(ChannelReader channelReader,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Console.WriteLine(
$"Number of tasks being processed: {messageProcessingTasksMap.Count}" +
$", Channel Count: {channelReader.Count}");
await Task.Delay(1000);
}

Console.WriteLine("Exiting Stats Task");
}
}


Подробнее здесь: https://stackoverflow.com/questions/785 ... nd-lock-be
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»