Проблема многопоточности C++ – как вернуть правильное сообщение правильному клиенту?C++

Программы на C++. Форум разработчиков
Ответить
Anonymous
 Проблема многопоточности C++ – как вернуть правильное сообщение правильному клиенту?

Сообщение Anonymous »

В настоящее время я изучаю многопоточность для C++.
Для контекста у меня есть серверный код, который в основном принимает сообщение protobuf со случайной координатой в Германии и возвращает ответ (название района ) клиенту. Для этого я использовал свои собственные геоданные и т. д. Поскольку обработки местоположения недостаточно для одного потока, я решил сделать это своего рода многопоточностью производитель-потребитель, то есть заставить обработку местоположения использовать несколько потоков.
Проблема в том, что , почему-то, когда я использую несколько экземпляров клиента (больше потоков и подключений от клиента), сообщения не возвращаются правильно. В некоторых экземплярах клиента регистрировались ошибки, например, клиент получил ответ на автомобиль, который не дождался ответа.
Прикреплен код соответствующих файлов. Я бы с радостью ответил или дал больше контекста, если это необходимо. Спасибо за помощь!
Connection.cc
void consumer(GeoData &geodata, Statistic &globalStats, ClientConfig &config) {
Client::updateBufferSize(config);

while (true) {
// Directly dequeue a single message from the shared queue
auto [client, msg] = sharedQueue.dequeue();

if (!client) {
std::cerr processLocationUpdates(msg.location_request, resp.location_response);

// Send the response back to the client
if (!client->writeMessage(resp)) {
std::cerr ai_family, res->ai_socktype, res->ai_protocol);
if (sock == -1) {
perror("socket");
return 1;
}

if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
perror("setsockopt");
return 1;
}

// Bind to the address
if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
perror("bind");
return 0;
}

freeaddrinfo(res);

if (listen(sock, BACKLOG) == -1) {
perror("listen");
return 0;
}

GeoData geodata;
if (!geodata.loadData("geodata/build/bezirke-final.geodata", nodeCapacity)) {
die("Could not load geodata");
}

// stats thread
Statistic stats;
if (pthread_create(&thread, NULL, statistic_thread, &stats) != 0) {
die("Failed to create statistics thread");
}

signal(SIGPIPE, SIG_IGN);

// pool of consumer threads
const int numConsumers = 4; // we can play around with this parameter of num of consumers
std::vector consumerThreads;
for (int i = 0; i < numConsumers; ++i) {
consumerThreads.emplace_back(consumer, std::ref(geodata), std::ref(stats), std::ref(clientConfig));
}

// Accept incoming connections and create producer threads
while (true) {
socklen_t size = sizeof(struct sockaddr_in);
struct sockaddr_in their_addr;
int client_fd = accept(sock, (struct sockaddr*)&their_addr, &size);
if (client_fd == -1) {
perror("accept");
continue; // Skip to the next connection attempt on failure
}

// Create a Client object for the new connection
Client* client = new Client(&geodata, client_fd, clientConfig);
client->setStatistic(&stats);

if (pthread_create(&thread, NULL, connection_thread, client) != 0) {
fprintf(stderr, "Failed to create producer thread\n");
delete client; // Cleanup client object on failure
}
}

// Wait for all consumer threads to finish (will not happen in this loop)
for (auto& thread : consumerThreads) {
thread.join();
}

close(sock); // Close the listening socket when the server shuts down
return 0;
}


#pragma once

#include
#include "geodata.h"
#include
#include
#include
#include
#include
#include "../proto/message.h"
#include
#include
#include
#include

/**
* @brief Structure to hold statistics about the number of messages processed.
*/
struct Statistic {
std::atomic countMsgs{0};
std::atomic countCacheHits{0};
std::atomic countCacheMisses{0};
};

/**
* @brief Structure to hold the client configuarations.
*/
struct ClientConfig {
size_t write_buffer_size;
size_t read_bytes_buffer_size;
size_t read_buffer_size;
};

/**
* @brief Thread-safe queue for multiple producers and consumers.
*
* Provides synchronized access to a queue for multiple producers and consumers.
*/
template
class ThreadSafeQueue {
private:
std::queue queue;
std::mutex mutex;
std::condition_variable cv;

public:
void enqueue(T item) {
{
std::lock_guard lock(mutex);
queue.push(std::move(item));
}
cv.notify_one();
}

T dequeue() {
std::unique_lock lock(mutex);
cv.wait(lock, [this] { return !queue.empty(); });
T item = std::move(queue.front());
queue.pop();
return item;
}

bool isEmpty() {
std::lock_guard lock(mutex);
return queue.empty();
}
};

/**
* @class Client
* @brief Represents a client connection and manages the communication between the server and client.
*
* The Client class handles receiving messages from the client, processing location updates,
* and sending responses back to the client. It also uses GEOS (Geometry Engine Open Source)
* for location-based queries and maintains statistics about message processing.
*/
class Client {
int client_fd; ///< The file descriptor for the client connection.
GeoData *geo; ///< A pointer to GeoData used for querying locations.
Statistic *stats = nullptr; ///< A pointer to the statistics object for tracking processed messages.

GEOSContextHandle_t geos_ctx;
ClientConfig &config;

static thread_local std::vector write_buffer;
static thread_local std::vector read_bytes_buffer;
static thread_local std::vector read_buffer;
static thread_local size_t write_buffer_offset;
static thread_local size_t read_bytes_buffer_offset;
static thread_local unsigned short write_buffer_repeat;

public:
/**
* @brief Constructor to initialize the Client object.
*
* Initializes the client object with the socket file descriptor and GeoData.
* A new GEOS context is created for geospatial operations.
*
* @param geo Pointer to the GeoData object used for querying location data.
* @param socket The socket file descriptor for client communication.
*/
Client(GeoData *geo, int socket, ClientConfig &config) : client_fd(socket), geo(geo), config(config) {
geos_ctx = geo->newContext();

if(write_buffer.empty()) {
write_buffer.resize(config.write_buffer_size);
}

if(read_bytes_buffer.empty()) {
read_bytes_buffer.resize(config.read_bytes_buffer_size);
}

if(read_buffer.empty()) {
read_buffer.resize(config.read_buffer_size);
}
}

/**
* @brief Destructor to clean up resources.
*
* Closes the client socket and cleans up the GEOS context.
*/
~Client() {
flushWriteBuffer();
close(client_fd);
GEOS_finish_r(geos_ctx);
}

/**
* @brief Set the statistics object to track message count.
*
* Associates a Statistic object with the client to record the number of messages processed.
*
* @param stats Pointer to a Statistic object.
*/
void setStatistic(Statistic *stats) { this->stats = stats; }

/**
* @brief Reads a message from the client socket.
*
* Reads a `wire::ClientMessage` from the client. The message is prefixed with
* a 4-byte length header indicating the size of the message.
*
* @param msg Reference to the `wire::ClientMessage` object to be populated with data.
* @return true if the message was successfully read, false otherwise.
*/
bool readMessage(ClientMessage &msg);

/**
* @brief Reads a specified number of bytes from the client socket into the provided buffer.
*
* This function now uses a thread-local buffer to minimize repeated allocations.
*
* @param client_fd The file descriptor to read from.
* @param buffer The buffer to copy data into.
* @param len The number of bytes to read.
* @return true if the specified number of bytes was successfully read, false otherwise.
*/
static bool read_bytes(int client_fd, char *buf, size_t len);

/**
* @brief Sends a response message to the client socket.
*
* Buffers the `wire::ClientResponse` to be sent, flushing only when the buffer is full.
*
* @param msg The `wire::ClientResponse` to send.
* @return true if the message was successfully buffered or sent, false otherwise.
*/
bool writeMessage(ClientResponse &msg);

/**
* @brief Flushes the write buffer to the client socket.
*
* Ensures all buffered messages are sent to the client.
*
* @return true if the buffer was successfully flushed, false otherwise.
*/
bool flushWriteBuffer();

/**
* @brief Processes a location update request.
*
* This method processes the location data contained in the `wire::LocationUpdateRequest`,
* performs a geospatial query based on longitude and latitude, and populates the
* `wire::LocationUpdateResponse` with the results.
*
* @param msg The `wire::LocationUpdateRequest` containing the car's location data.
* @param resp The `wire::LocationUpdateResponse` to populate with the result of the query.
* @return true if the location update was successfully processed, false otherwise.
*/
bool processLocationUpdates(const LocationUpdateRequest &msg,
LocationUpdateResponse &resp);

/**
* @brief Resize read and write buffers.
*
* This method resizes the read and write buffers.
*
* @param write_size The write buffer size.
* @param read_bytes_size The read bytes buffer size.
* @param read_size The read buffer size.
*/
static void updateBufferSize(ClientConfig &config);

bool isCacheHit(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);

bool queryLocation(const LocationUpdateRequest &msg, LocationUpdateResponse &resp);

ClientConfig & getConfig();
};

/**
* @brief Function to process client messages using the consumer role.
*
* This function is run in worker threads to handle messages from the shared queue.
* It dequeues messages, processes location updates, and sends responses back to clients.
*
* @param geodata Reference to the GeoData object for geospatial queries.
* @param stats Reference to the statistics object to track processed messages.
*/
void consumer(GeoData& geodata, Statistic& stats, ClientConfig& config);


Подробнее здесь: https://stackoverflow.com/questions/793 ... he-correct
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C++»