Вот код
Инициализация словаря происходит на уровне класса.
StoredPoints.Keys
Count = 4
Код: Выделить всё
[0]: "SNxxx_B"
[1]: "SNxxx60"
[2]: "SNxxx58"
[3]: "SNxxx_B"
Код: Выделить всё
await _kafkaService.ConsumeMessageWithWait(conf, _hostingServiceOption.Topics, source.Token, async (kafkaMessage) =>
{
try
{
if (!ConnectionService.IsCalculationServiceConnected())
{
// cancel if no service in listening
cts.Cancel();
return;
}
NumberOfPointsReceived++;
Stopwatch.Restart();
var status = await _dataSenderService.SendMessage(kafkaMessage.Topic, kafkaMessage.Message.Key, kafkaMessage.Message.Value, kafkaMessage.Message.Timestamp);
Stopwatch.Stop();
TimeElapsedForProcessing += Stopwatch.ElapsedMilliseconds;
}
catch (Exception ex)
{
_logger.Fatal("{Exception}", ex);
}
}, true, true);
public class DataSenderService
{
public record KafkaMessage(string topic, string key, string message, Confluent.Kafka.Timestamp timestamp);
public static Dictionary StoredPoints = new Dictionary();
public async Task SendMessage(string topic, string key, string message, Confluent.Kafka.Timestamp timestamp)
{
if (!StoredPoints.ContainsKey(key))
{
StoredPoints.Add(key, new List());
}
StoredPoints[key].Add(kafkaMessage);
return MessageStatus.Sent;
}
}

Подробнее здесь: https://stackoverflow.com/questions/787 ... icate-keys
Мобильная версия