Поток RabbitMQ по шаблону запроса и ответаC#

Место общения программистов C#
Ответить
Anonymous
 Поток RabbitMQ по шаблону запроса и ответа

Сообщение Anonymous »

Я играл с потоками RabbitMq, используя .net, поэтому идея состоит в том, что у меня есть производитель, который отправляет запросы и ожидает фрагментированного ответа от потребителя. Потребитель обрабатывает запрос и готовит ответ, вот что я создал на данный момент
using System.Buffers;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace RabbitMQConsumer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}

static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};

await StartConsumer(streamSystemConfig);

Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
}

static async Task StartConsumer(StreamSystemConfig config)
{
var responseStreamSystem = await StreamSystem.Create(config);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});
var responseProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "response-stream"));

var requestStreamSystem = await StreamSystem.Create(config);
await requestStreamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});

var consumerConfig = new ConsumerConfig(requestStreamSystem, "request-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var requestMessage = messageWithHeaders!.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var requestText = Encoding.UTF8.GetString(requestMessage);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);
Console.WriteLine($"[{DateTime.UtcNow}] Received request: {requestText} with correlation ID: {correlationId}, Latency: {latency}ms");

var loremIpsum = new byte[100 * 1024 * 1024]; // 100 MB of dummy data
var random = new Random();
random.NextBytes(loremIpsum);

var chunkSize = 5 * 1024 * 1024; // 5 MB
var totalChunks = (int)Math.Ceiling((double)loremIpsum.Length / chunkSize);

for (int i = 0; i < totalChunks; i++)
{
var chunk = new byte[Math.Min(chunkSize, loremIpsum.Length - (i * chunkSize))];
Array.Copy(loremIpsum, i * chunkSize, chunk, 0, chunk.Length);

var responseChunk = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "chunk_index", i.ToString() },
{ "total_chunks", totalChunks.ToString() },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = chunk
})));

try
{
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Failed to send response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}. Error: {ex.Message}");
}
}

Console.WriteLine($"[{DateTime.UtcNow}] Processed request and sent response in {totalChunks} chunks for: {requestText}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing request: {ex.Message}");
}

await Task.CompletedTask;
}
};

await Consumer.Create(consumerConfig);
}

static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}

var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}

Не он получает запросы и выдает ответ, но я не знаю, почему он не отправляет их обратно
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");

Я проверил, открыт ли производитель ответа, и он открыт, никаких исключений не генерируется, и когда я проверяю диспетчер RabbitMQ, ответное сообщение не было отправлено в ответПродуцент.
Для лучшего контекста, вот мой продюсер:
using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace RabbitMQProducer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}

static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};

var streamSystem = await StreamSystem.Create(streamSystemConfig);

await streamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});

var responseStreamSystem = await StreamSystem.Create(streamSystemConfig);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});

var requestProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "request-stream"));
var receivedChunks = new ConcurrentDictionary();
var totalChunksMap = new ConcurrentDictionary();

var responseConsumerConfig = new ConsumerConfig(streamSystem, "response-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var responseChunk = messageWithHeaders.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var chunkIndex = int.Parse(messageWithHeaders.Headers["chunk_index"]);
var totalChunks = int.Parse(messageWithHeaders.Headers["total_chunks"]);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);

Console.WriteLine($"[{DateTime.UtcNow}] Received response chunk {chunkIndex + 1} of {totalChunks} for correlation ID: {correlationId}, Latency: {latency}ms");

// Add the chunk to the list of received chunks for the correlation ID
var chunks = receivedChunks.GetOrAdd(correlationId, _ => new List());
lock (chunks)
{
chunks.Add(responseChunk);
}

// Store the total number of chunks expected
totalChunksMap[correlationId] = totalChunks;

// Check if all chunks have been received
if (chunks.Count == totalChunks)
{
// Reassemble the chunks
var fullMessage = chunks.SelectMany(chunk => chunk).ToArray();
var reassembledText = Encoding.UTF8.GetString(fullMessage);

Console.WriteLine($"[{DateTime.UtcNow}] Reassembled full message for correlation ID {correlationId}: {reassembledText}");

List received;
int total;

// Clean up the data structures
receivedChunks.TryRemove(correlationId, out received);
totalChunksMap.TryRemove(correlationId, out total);
}
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing response: {ex.Message}");
}

await Task.CompletedTask;
}
};

await Consumer.Create(responseConsumerConfig);

for (int i = 0; i < 5; i++)
{
var correlationId = Guid.NewGuid().ToString();
var requestText = $"Request {i + 1}";
var requestMessage = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = Encoding.UTF8.GetBytes(requestText)
})));

Console.WriteLine($"[{DateTime.UtcNow}] Sending request: {requestText}");
await requestProducer.Send(requestMessage);
}

Console.WriteLine("Producer is running. Press [enter] to exit.");
Console.ReadLine();
}

static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}

var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}


Подробнее здесь: https://stackoverflow.com/questions/787 ... se-pattern
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»