Я играл с потоками RabbitMq, используя .net, поэтому идея состоит в том, что у меня есть производитель, который отправляет запросы и ожидает фрагментированного ответа от потребителя. Потребитель обрабатывает запрос и готовит ответ, вот что я создал на данный момент
using System.Buffers;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace RabbitMQConsumer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}
static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};
await StartConsumer(streamSystemConfig);
Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
}
static async Task StartConsumer(StreamSystemConfig config)
{
var responseStreamSystem = await StreamSystem.Create(config);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});
var responseProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "response-stream"));
var requestStreamSystem = await StreamSystem.Create(config);
await requestStreamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});
var consumerConfig = new ConsumerConfig(requestStreamSystem, "request-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var requestMessage = messageWithHeaders!.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var requestText = Encoding.UTF8.GetString(requestMessage);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);
Console.WriteLine($"[{DateTime.UtcNow}] Received request: {requestText} with correlation ID: {correlationId}, Latency: {latency}ms");
var loremIpsum = new byte[100 * 1024 * 1024]; // 100 MB of dummy data
var random = new Random();
random.NextBytes(loremIpsum);
var chunkSize = 5 * 1024 * 1024; // 5 MB
var totalChunks = (int)Math.Ceiling((double)loremIpsum.Length / chunkSize);
for (int i = 0; i < totalChunks; i++)
{
var chunk = new byte[Math.Min(chunkSize, loremIpsum.Length - (i * chunkSize))];
Array.Copy(loremIpsum, i * chunkSize, chunk, 0, chunk.Length);
var responseChunk = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "chunk_index", i.ToString() },
{ "total_chunks", totalChunks.ToString() },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = chunk
})));
try
{
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Failed to send response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}. Error: {ex.Message}");
}
}
Console.WriteLine($"[{DateTime.UtcNow}] Processed request and sent response in {totalChunks} chunks for: {requestText}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing request: {ex.Message}");
}
await Task.CompletedTask;
}
};
await Consumer.Create(consumerConfig);
}
static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}
var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}
Не он получает запросы и выдает ответ, но я не знаю, почему он не отправляет их обратно
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
Я проверил, открыт ли производитель ответа, и он открыт, никаких исключений не генерируется, и когда я проверяю диспетчер RabbitMQ, ответное сообщение не было отправлено в ответПродуцент.
Для лучшего контекста, вот мой продюсер:
using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace RabbitMQProducer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}
static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};
var streamSystem = await StreamSystem.Create(streamSystemConfig);
await streamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});
var responseStreamSystem = await StreamSystem.Create(streamSystemConfig);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});
var requestProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "request-stream"));
var receivedChunks = new ConcurrentDictionary();
var totalChunksMap = new ConcurrentDictionary();
var responseConsumerConfig = new ConsumerConfig(streamSystem, "response-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var responseChunk = messageWithHeaders.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var chunkIndex = int.Parse(messageWithHeaders.Headers["chunk_index"]);
var totalChunks = int.Parse(messageWithHeaders.Headers["total_chunks"]);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);
Console.WriteLine($"[{DateTime.UtcNow}] Received response chunk {chunkIndex + 1} of {totalChunks} for correlation ID: {correlationId}, Latency: {latency}ms");
// Add the chunk to the list of received chunks for the correlation ID
var chunks = receivedChunks.GetOrAdd(correlationId, _ => new List());
lock (chunks)
{
chunks.Add(responseChunk);
}
// Store the total number of chunks expected
totalChunksMap[correlationId] = totalChunks;
// Check if all chunks have been received
if (chunks.Count == totalChunks)
{
// Reassemble the chunks
var fullMessage = chunks.SelectMany(chunk => chunk).ToArray();
var reassembledText = Encoding.UTF8.GetString(fullMessage);
Console.WriteLine($"[{DateTime.UtcNow}] Reassembled full message for correlation ID {correlationId}: {reassembledText}");
List received;
int total;
// Clean up the data structures
receivedChunks.TryRemove(correlationId, out received);
totalChunksMap.TryRemove(correlationId, out total);
}
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing response: {ex.Message}");
}
await Task.CompletedTask;
}
};
await Consumer.Create(responseConsumerConfig);
for (int i = 0; i < 5; i++)
{
var correlationId = Guid.NewGuid().ToString();
var requestText = $"Request {i + 1}";
var requestMessage = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = Encoding.UTF8.GetBytes(requestText)
})));
Console.WriteLine($"[{DateTime.UtcNow}] Sending request: {requestText}");
await requestProducer.Send(requestMessage);
}
Console.WriteLine("Producer is running. Press [enter] to exit.");
Console.ReadLine();
}
static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}
var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/787 ... se-pattern
Поток RabbitMQ по шаблону запроса и ответа ⇐ C#
Место общения программистов C#
-
Anonymous
1721826911
Anonymous
Я играл с потоками RabbitMq, используя .net, поэтому идея состоит в том, что у меня есть производитель, который отправляет запросы и ожидает фрагментированного ответа от потребителя. Потребитель обрабатывает запрос и готовит ответ, вот что я создал на данный момент
using System.Buffers;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace RabbitMQConsumer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}
static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};
await StartConsumer(streamSystemConfig);
Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
}
static async Task StartConsumer(StreamSystemConfig config)
{
var responseStreamSystem = await StreamSystem.Create(config);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});
var responseProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "response-stream"));
var requestStreamSystem = await StreamSystem.Create(config);
await requestStreamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});
var consumerConfig = new ConsumerConfig(requestStreamSystem, "request-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var requestMessage = messageWithHeaders!.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var requestText = Encoding.UTF8.GetString(requestMessage);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);
Console.WriteLine($"[{DateTime.UtcNow}] Received request: {requestText} with correlation ID: {correlationId}, Latency: {latency}ms");
var loremIpsum = new byte[100 * 1024 * 1024]; // 100 MB of dummy data
var random = new Random();
random.NextBytes(loremIpsum);
var chunkSize = 5 * 1024 * 1024; // 5 MB
var totalChunks = (int)Math.Ceiling((double)loremIpsum.Length / chunkSize);
for (int i = 0; i < totalChunks; i++)
{
var chunk = new byte[Math.Min(chunkSize, loremIpsum.Length - (i * chunkSize))];
Array.Copy(loremIpsum, i * chunkSize, chunk, 0, chunk.Length);
var responseChunk = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "chunk_index", i.ToString() },
{ "total_chunks", totalChunks.ToString() },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = chunk
})));
try
{
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Failed to send response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}. Error: {ex.Message}");
}
}
Console.WriteLine($"[{DateTime.UtcNow}] Processed request and sent response in {totalChunks} chunks for: {requestText}");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing request: {ex.Message}");
}
await Task.CompletedTask;
}
};
await Consumer.Create(consumerConfig);
}
static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}
var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}
Не он получает запросы и выдает ответ, но я не знаю, почему он не отправляет их обратно
Console.WriteLine($"[{DateTime.UtcNow}] Sending response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
await responseProducer.Send(responseChunk);
Console.WriteLine($"[{DateTime.UtcNow}] Successfully sent response chunk {i + 1} of {totalChunks} for correlation ID: {correlationId}");
Я проверил, открыт ли производитель ответа, и он открыт, никаких исключений не генерируется, и когда я проверяю диспетчер RabbitMQ, ответное сообщение не было отправлено в ответПродуцент.
Для лучшего контекста, вот мой продюсер:
using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Text.Json;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace RabbitMQProducer
{
class Program
{
class MessageWithHeaders
{
public Dictionary Headers { get; set; }
public byte[] Body { get; set; }
}
static async Task Main(string[] args)
{
var streamSystemConfig = new StreamSystemConfig
{
UserName = "admin",
Password = "admin",
Endpoints = new List { new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5552) }
};
var streamSystem = await StreamSystem.Create(streamSystemConfig);
await streamSystem.CreateStream(new StreamSpec("request-stream")
{
MaxLengthBytes = 5_000_000_000
});
var responseStreamSystem = await StreamSystem.Create(streamSystemConfig);
await responseStreamSystem.CreateStream(new StreamSpec("response-stream")
{
MaxLengthBytes = 5_000_000_000
});
var requestProducer = await Producer.Create(new ProducerConfig(responseStreamSystem, "request-stream"));
var receivedChunks = new ConcurrentDictionary();
var totalChunksMap = new ConcurrentDictionary();
var responseConsumerConfig = new ConsumerConfig(streamSystem, "response-stream")
{
OffsetSpec = new OffsetTypeNext(),
MessageHandler = async (stream, _, consumer, message) =>
{
try
{
var messageContent = ToByteArray(message.Data.Contents);
var messageWithHeaders = JsonSerializer.Deserialize(Encoding.UTF8.GetString(messageContent));
var responseChunk = messageWithHeaders.Body;
var correlationId = messageWithHeaders.Headers["correlation_id"];
var chunkIndex = int.Parse(messageWithHeaders.Headers["chunk_index"]);
var totalChunks = int.Parse(messageWithHeaders.Headers["total_chunks"]);
var receiveTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var latency = receiveTime - long.Parse(messageWithHeaders.Headers["timestamp"]);
Console.WriteLine($"[{DateTime.UtcNow}] Received response chunk {chunkIndex + 1} of {totalChunks} for correlation ID: {correlationId}, Latency: {latency}ms");
// Add the chunk to the list of received chunks for the correlation ID
var chunks = receivedChunks.GetOrAdd(correlationId, _ => new List());
lock (chunks)
{
chunks.Add(responseChunk);
}
// Store the total number of chunks expected
totalChunksMap[correlationId] = totalChunks;
// Check if all chunks have been received
if (chunks.Count == totalChunks)
{
// Reassemble the chunks
var fullMessage = chunks.SelectMany(chunk => chunk).ToArray();
var reassembledText = Encoding.UTF8.GetString(fullMessage);
Console.WriteLine($"[{DateTime.UtcNow}] Reassembled full message for correlation ID {correlationId}: {reassembledText}");
List received;
int total;
// Clean up the data structures
receivedChunks.TryRemove(correlationId, out received);
totalChunksMap.TryRemove(correlationId, out total);
}
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.UtcNow}] Error processing response: {ex.Message}");
}
await Task.CompletedTask;
}
};
await Consumer.Create(responseConsumerConfig);
for (int i = 0; i < 5; i++)
{
var correlationId = Guid.NewGuid().ToString();
var requestText = $"Request {i + 1}";
var requestMessage = new Message(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new MessageWithHeaders
{
Headers = new Dictionary
{
{ "correlation_id", correlationId },
{ "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() }
},
Body = Encoding.UTF8.GetBytes(requestText)
})));
Console.WriteLine($"[{DateTime.UtcNow}] Sending request: {requestText}");
await requestProducer.Send(requestMessage);
}
Console.WriteLine("Producer is running. Press [enter] to exit.");
Console.ReadLine();
}
static byte[] ToByteArray(ReadOnlySequence sequence)
{
if (sequence.IsSingleSegment)
{
return sequence.First.ToArray();
}
var array = new byte[sequence.Length];
sequence.CopyTo(array);
return array;
}
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/78788594/rabbitmq-stream-by-request-and-response-pattern[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия