В настоящее время я изучаю многопоточность для C++.
Для контекста у меня есть серверный код, который в основном принимает сообщение protobuf со случайной координатой в Германии и возвращает ответ (название района ) клиенту. Для этого я использовал свои собственные геоданные и т. д. Поскольку обработки местоположения недостаточно для одного потока, я решил сделать это своего рода многопоточностью производитель-потребитель, то есть заставить обработку местоположения использовать несколько потоков.
Проблема в том, что , почему-то, когда я использую несколько экземпляров клиента (больше потоков и подключений от клиента), сообщения не возвращаются правильно. В некоторых экземплярах клиента регистрировались ошибки, например, клиент получил ответ на автомобиль, который не дождался ответа.
Прикреплен код соответствующих файлов. Я бы с радостью ответил или дал больше контекста, если это необходимо. Спасибо за помощь!
Connection.cc
void consumer(GeoData &geodata, Statistic &globalStats, ClientConfig &config) {
Client::updateBufferSize(config);
while (true) {
// Directly dequeue a single message from the shared queue
auto [client, msg] = sharedQueue.dequeue();
if (!client) {
std::cerr processLocationUpdates(msg.location_request, resp.location_response);
// Send the response back to the client
if (!client->writeMessage(resp)) {
std::cerr ai_family, res->ai_socktype, res->ai_protocol);
if (sock == -1) {
perror("socket");
return 1;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
perror("setsockopt");
return 1;
}
// Bind to the address
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
return 0;
}
freeaddrinfo(res);
if (listen(sock, BACKLOG) == -1) {
perror("listen");
return 0;
}
GeoData geodata;
if (!geodata.loadData("geodata/build/bezirke-final.geodata", nodeCapacity)) {
die("Could not load geodata");
}
// stats thread
Statistic stats;
if (pthread_create(&thread, NULL, statistic_thread, &stats) != 0) {
die("Failed to create statistics thread");
}
signal(SIGPIPE, SIG_IGN);
// pool of consumer threads
const int numConsumers = 4; // we can play around with this parameter of num of consumers
std::vector consumerThreads;
for (int i = 0; i < numConsumers; ++i) {
consumerThreads.emplace_back(consumer, std::ref(geodata), std::ref(stats), std::ref(clientConfig));
}
// Accept incoming connections and create producer threads
while (true) {
socklen_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int client_fd = accept(sock, (struct sockaddr*)&their_addr, &size);
if (client_fd == -1) {
perror("accept");
continue; // Skip to the next connection attempt on failure
}
// Create a Client object for the new connection
Client* client = new Client(&geodata, client_fd, clientConfig);
client->setStatistic(&stats);
if (pthread_create(&thread, NULL, connection_thread, client) != 0) {
fprintf(stderr, "Failed to create producer thread\n");
delete client; // Cleanup client object on failure
}
}
// Wait for all consumer threads to finish (will not happen in this loop)
for (auto& thread : consumerThreads) {
thread.join();
}
close(sock); // Close the listening socket when the server shuts down
return 0;
}
#pragma once
#include
#include "geodata.h"
#include
#include
#include
#include
#include
#include "../proto/message.h"
#include
#include
#include
#include
/**
* @brief Structure to hold statistics about the number of messages processed.
*/
struct Statistic {
std::atomic countMsgs{0};
std::atomic countCacheHits{0};
std::atomic countCacheMisses{0};
};
/**
* @brief Structure to hold the client configuarations.
*/
struct ClientConfig {
size_t write_buffer_size;
size_t read_bytes_buffer_size;
size_t read_buffer_size;
};
/**
* @brief Thread-safe queue for multiple producers and consumers.
*
* Provides synchronized access to a queue for multiple producers and consumers.
*/
template
class ThreadSafeQueue {
private:
std::queue queue;
std::mutex mutex;
std::condition_variable cv;
public:
void enqueue(T item) {
{
std::lock_guard lock(mutex);
queue.push(std::move(item));
}
cv.notify_one();
}
T dequeue() {
std::unique_lock lock(mutex);
cv.wait(lock, [this] { return !queue.empty(); });
T item = std::move(queue.front());
queue.pop();
return item;
}
bool isEmpty() {
std::lock_guard lock(mutex);
return queue.empty();
}
};
/**
* @class Client
* @brief Represents a client connection and manages the communication between the server and client.
*
* The Client class handles receiving messages from the client, processing location updates,
* and sending responses back to the client. It also uses GEOS (Geometry Engine Open Source)
* for location-based queries and maintains statistics about message processing.
*/
class Client {
int client_fd; ///< The file descriptor for the client connection.
GeoData *geo; ///< A pointer to GeoData used for querying locations.
Statistic *stats = nullptr; ///< A pointer to the statistics object for tracking processed messages.
GEOSContextHandle_t geos_ctx;
ClientConfig &config;
static thread_local std::vector write_buffer;
static thread_local std::vector read_bytes_buffer;
static thread_local std::vector read_buffer;
static thread_local size_t write_buffer_offset;
static thread_local size_t read_bytes_buffer_offset;
static thread_local unsigned short write_buffer_repeat;
public:
/**
* @brief Constructor to initialize the Client object.
*
* Initializes the client object with the socket file descriptor and GeoData.
* A new GEOS context is created for geospatial operations.
*
* @param geo Pointer to the GeoData object used for querying location data.
* @param socket The socket file descriptor for client communication.
*/
Client(GeoData *geo, int socket, ClientConfig &config) : client_fd(socket), geo(geo), config(config) {
geos_ctx = geo->newContext();
if(write_buffer.empty()) {
write_buffer.resize(config.write_buffer_size);
}
if(read_bytes_buffer.empty()) {
read_bytes_buffer.resize(config.read_bytes_buffer_size);
}
if(read_buffer.empty()) {
read_buffer.resize(config.read_buffer_size);
}
}
/**
* @brief Destructor to clean up resources.
*
* Closes the client socket and cleans up the GEOS context.
*/
~Client() {
flushWriteBuffer();
close(client_fd);
GEOS_finish_r(geos_ctx);
}
/**
* @brief Set the statistics object to track message count.
*
* Associates a Statistic object with the client to record the number of messages processed.
*
* @param stats Pointer to a Statistic object.
*/
void setStatistic(Statistic *stats) { this->stats = stats; }
/**
* @brief Reads a message from the client socket.
*
* Reads a `wire::ClientMessage` from the client. The message is prefixed with
* a 4-byte length header indicating the size of the message.
*
* @param msg Reference to the `wire::ClientMessage` object to be populated with data.
* @return true if the message was successfully read, false otherwise.
*/
bool readMessage(ClientMessage &msg);
/**
* @brief Reads a specified number of bytes from the client socket into the provided buffer.
*
* This function now uses a thread-local buffer to minimize repeated allocations.
*
* @param client_fd The file descriptor to read from.
* @param buffer The buffer to copy data into.
* @param len The number of bytes to read.
* @return true if the specified number of bytes was successfully read, false otherwise.
*/
static bool read_bytes(int client_fd, char *buf, size_t len);
/**
* @brief Sends a response message to the client socket.
*
* Buffers the `wire::ClientResponse` to be sent, flushing only when the buffer is full.
*
* @param msg The `wire::ClientResponse` to send.
* @return true if the message was successfully buffered or sent, false otherwise.
*/
bool writeMessage(ClientResponse &msg);
/**
* @brief Flushes the write buffer to the client socket.
*
* Ensures all buffered messages are sent to the client.
*
* @return true if the buffer was successfully flushed, false otherwise.
*/
bool flushWriteBuffer();
/**
* @brief Processes a location update request.
*
* This method processes the location data contained in the `wire::LocationUpdateRequest`,
* performs a geospatial query based on longitude and latitude, and populates the
* `wire::LocationUpdateResponse` with the results.
*
* @param msg The `wire::LocationUpdateRequest` containing the car's location data.
* @param resp The `wire::LocationUpdateResponse` to populate with the result of the query.
* @return true if the location update was successfully processed, false otherwise.
*/
bool processLocationUpdates(const LocationUpdateRequest &msg,
LocationUpdateResponse &resp);
/**
* @brief Resize read and write buffers.
*
* This method resizes the read and write buffers.
*
* @param write_size The write buffer size.
* @param read_bytes_size The read bytes buffer size.
* @param read_size The read buffer size.
*/
static void updateBufferSize(ClientConfig &config);
bool isCacheHit(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
bool queryLocation(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
ClientConfig & getConfig();
};
/**
* @brief Function to process client messages using the consumer role.
*
* This function is run in worker threads to handle messages from the shared queue.
* It dequeues messages, processes location updates, and sends responses back to clients.
*
* @param geodata Reference to the GeoData object for geospatial queries.
* @param stats Reference to the statistics object to track processed messages.
*/
void consumer(GeoData& geodata, Statistic& stats, ClientConfig& config);
Подробнее здесь: https://stackoverflow.com/questions/793 ... he-correct
Проблема многопоточности C++ – как вернуть правильное сообщение правильному клиенту? ⇐ C++
Программы на C++. Форум разработчиков
1736182927
Anonymous
В настоящее время я изучаю многопоточность для C++.
Для контекста у меня есть серверный код, который в основном принимает сообщение protobuf со случайной координатой в Германии и возвращает ответ (название района ) клиенту. Для этого я использовал свои собственные геоданные и т. д. Поскольку обработки местоположения недостаточно для одного потока, я решил сделать это своего рода многопоточностью производитель-потребитель, то есть заставить обработку местоположения использовать несколько потоков.
Проблема в том, что , почему-то, когда я использую несколько экземпляров клиента (больше потоков и подключений от клиента), сообщения не возвращаются правильно. В некоторых экземплярах клиента регистрировались ошибки, например, клиент получил ответ на автомобиль, который не дождался ответа.
Прикреплен код соответствующих файлов. Я бы с радостью ответил или дал больше контекста, если это необходимо. Спасибо за помощь!
Connection.cc
void consumer(GeoData &geodata, Statistic &globalStats, ClientConfig &config) {
Client::updateBufferSize(config);
while (true) {
// Directly dequeue a single message from the shared queue
auto [client, msg] = sharedQueue.dequeue();
if (!client) {
std::cerr processLocationUpdates(msg.location_request, resp.location_response);
// Send the response back to the client
if (!client->writeMessage(resp)) {
std::cerr ai_family, res->ai_socktype, res->ai_protocol);
if (sock == -1) {
perror("socket");
return 1;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
perror("setsockopt");
return 1;
}
// Bind to the address
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
return 0;
}
freeaddrinfo(res);
if (listen(sock, BACKLOG) == -1) {
perror("listen");
return 0;
}
GeoData geodata;
if (!geodata.loadData("geodata/build/bezirke-final.geodata", nodeCapacity)) {
die("Could not load geodata");
}
// stats thread
Statistic stats;
if (pthread_create(&thread, NULL, statistic_thread, &stats) != 0) {
die("Failed to create statistics thread");
}
signal(SIGPIPE, SIG_IGN);
// pool of consumer threads
const int numConsumers = 4; // we can play around with this parameter of num of consumers
std::vector consumerThreads;
for (int i = 0; i < numConsumers; ++i) {
consumerThreads.emplace_back(consumer, std::ref(geodata), std::ref(stats), std::ref(clientConfig));
}
// Accept incoming connections and create producer threads
while (true) {
socklen_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int client_fd = accept(sock, (struct sockaddr*)&their_addr, &size);
if (client_fd == -1) {
perror("accept");
continue; // Skip to the next connection attempt on failure
}
// Create a Client object for the new connection
Client* client = new Client(&geodata, client_fd, clientConfig);
client->setStatistic(&stats);
if (pthread_create(&thread, NULL, connection_thread, client) != 0) {
fprintf(stderr, "Failed to create producer thread\n");
delete client; // Cleanup client object on failure
}
}
// Wait for all consumer threads to finish (will not happen in this loop)
for (auto& thread : consumerThreads) {
thread.join();
}
close(sock); // Close the listening socket when the server shuts down
return 0;
}
#pragma once
#include
#include "geodata.h"
#include
#include
#include
#include
#include
#include "../proto/message.h"
#include
#include
#include
#include
/**
* @brief Structure to hold statistics about the number of messages processed.
*/
struct Statistic {
std::atomic countMsgs{0};
std::atomic countCacheHits{0};
std::atomic countCacheMisses{0};
};
/**
* @brief Structure to hold the client configuarations.
*/
struct ClientConfig {
size_t write_buffer_size;
size_t read_bytes_buffer_size;
size_t read_buffer_size;
};
/**
* @brief Thread-safe queue for multiple producers and consumers.
*
* Provides synchronized access to a queue for multiple producers and consumers.
*/
template
class ThreadSafeQueue {
private:
std::queue queue;
std::mutex mutex;
std::condition_variable cv;
public:
void enqueue(T item) {
{
std::lock_guard lock(mutex);
queue.push(std::move(item));
}
cv.notify_one();
}
T dequeue() {
std::unique_lock lock(mutex);
cv.wait(lock, [this] { return !queue.empty(); });
T item = std::move(queue.front());
queue.pop();
return item;
}
bool isEmpty() {
std::lock_guard lock(mutex);
return queue.empty();
}
};
/**
* @class Client
* @brief Represents a client connection and manages the communication between the server and client.
*
* The Client class handles receiving messages from the client, processing location updates,
* and sending responses back to the client. It also uses GEOS (Geometry Engine Open Source)
* for location-based queries and maintains statistics about message processing.
*/
class Client {
int client_fd; ///< The file descriptor for the client connection.
GeoData *geo; ///< A pointer to GeoData used for querying locations.
Statistic *stats = nullptr; ///< A pointer to the statistics object for tracking processed messages.
GEOSContextHandle_t geos_ctx;
ClientConfig &config;
static thread_local std::vector write_buffer;
static thread_local std::vector read_bytes_buffer;
static thread_local std::vector read_buffer;
static thread_local size_t write_buffer_offset;
static thread_local size_t read_bytes_buffer_offset;
static thread_local unsigned short write_buffer_repeat;
public:
/**
* @brief Constructor to initialize the Client object.
*
* Initializes the client object with the socket file descriptor and GeoData.
* A new GEOS context is created for geospatial operations.
*
* @param geo Pointer to the GeoData object used for querying location data.
* @param socket The socket file descriptor for client communication.
*/
Client(GeoData *geo, int socket, ClientConfig &config) : client_fd(socket), geo(geo), config(config) {
geos_ctx = geo->newContext();
if(write_buffer.empty()) {
write_buffer.resize(config.write_buffer_size);
}
if(read_bytes_buffer.empty()) {
read_bytes_buffer.resize(config.read_bytes_buffer_size);
}
if(read_buffer.empty()) {
read_buffer.resize(config.read_buffer_size);
}
}
/**
* @brief Destructor to clean up resources.
*
* Closes the client socket and cleans up the GEOS context.
*/
~Client() {
flushWriteBuffer();
close(client_fd);
GEOS_finish_r(geos_ctx);
}
/**
* @brief Set the statistics object to track message count.
*
* Associates a Statistic object with the client to record the number of messages processed.
*
* @param stats Pointer to a Statistic object.
*/
void setStatistic(Statistic *stats) { this->stats = stats; }
/**
* @brief Reads a message from the client socket.
*
* Reads a `wire::ClientMessage` from the client. The message is prefixed with
* a 4-byte length header indicating the size of the message.
*
* @param msg Reference to the `wire::ClientMessage` object to be populated with data.
* @return true if the message was successfully read, false otherwise.
*/
bool readMessage(ClientMessage &msg);
/**
* @brief Reads a specified number of bytes from the client socket into the provided buffer.
*
* This function now uses a thread-local buffer to minimize repeated allocations.
*
* @param client_fd The file descriptor to read from.
* @param buffer The buffer to copy data into.
* @param len The number of bytes to read.
* @return true if the specified number of bytes was successfully read, false otherwise.
*/
static bool read_bytes(int client_fd, char *buf, size_t len);
/**
* @brief Sends a response message to the client socket.
*
* Buffers the `wire::ClientResponse` to be sent, flushing only when the buffer is full.
*
* @param msg The `wire::ClientResponse` to send.
* @return true if the message was successfully buffered or sent, false otherwise.
*/
bool writeMessage(ClientResponse &msg);
/**
* @brief Flushes the write buffer to the client socket.
*
* Ensures all buffered messages are sent to the client.
*
* @return true if the buffer was successfully flushed, false otherwise.
*/
bool flushWriteBuffer();
/**
* @brief Processes a location update request.
*
* This method processes the location data contained in the `wire::LocationUpdateRequest`,
* performs a geospatial query based on longitude and latitude, and populates the
* `wire::LocationUpdateResponse` with the results.
*
* @param msg The `wire::LocationUpdateRequest` containing the car's location data.
* @param resp The `wire::LocationUpdateResponse` to populate with the result of the query.
* @return true if the location update was successfully processed, false otherwise.
*/
bool processLocationUpdates(const LocationUpdateRequest &msg,
LocationUpdateResponse &resp);
/**
* @brief Resize read and write buffers.
*
* This method resizes the read and write buffers.
*
* @param write_size The write buffer size.
* @param read_bytes_size The read bytes buffer size.
* @param read_size The read buffer size.
*/
static void updateBufferSize(ClientConfig &config);
bool isCacheHit(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
bool queryLocation(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);
ClientConfig & getConfig();
};
/**
* @brief Function to process client messages using the consumer role.
*
* This function is run in worker threads to handle messages from the shared queue.
* It dequeues messages, processes location updates, and sends responses back to clients.
*
* @param geodata Reference to the GeoData object for geospatial queries.
* @param stats Reference to the statistics object to track processed messages.
*/
void consumer(GeoData& geodata, Statistic& stats, ClientConfig& config);
Подробнее здесь: [url]https://stackoverflow.com/questions/79333777/c-multithreading-problem-how-do-i-return-the-correct-message-to-the-correct[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия