Код: Выделить всё
// 头文件部分
#pragma once
#include
#include
#include
#include
#include
#include
#include // 标准库读写锁
struct OperationContext {
std::string operation_id;
EventData conflict_info;
};
class ResourceTracker {
public:
ResourceTracker() = default;
~ResourceTracker() = default;
std::pair CheckOverlap(const std::string& resource_name);
// 如果之前是空的并且插入成功则返回true
Result AddActiveRecord(const std::string& resource_name, const OperationContext& context);
// 如果删除后变为空则返回true
Result RemoveActiveRecord(const std::string& resource_name);
private:
std::shared_mutex lock_; // 使用标准库读写锁
std::unordered_map active_records_; // 资源名称 -> 操作上下文
};
// 树状结构的资源跟踪器
// 并发模型:
// 1. CheckOverlap 和 AddActiveRecord 可以并发: CheckOverlap 只会被单线程调用
// 当有待报告的任务时,CheckOverlap 会被阻塞
// 2. RemoveActiveRecord 可以与其他操作并发调用
// 3. ResourceNode 的 lock_ 字段保护 sub_nodes 结构
class ResourceNode {
public:
explicit ResourceNode(uint32_t depth);
~ResourceNode() = default;
std::pair CheckOverlap(const std::vector& path_components);
Result AddActiveRecord(const std::vector& path_components,
const OperationContext& context);
// 如果所有桶都为空且没有子节点则返回true
Result RemoveActiveRecord(const std::vector& path_components);
// 检查是否有非空桶或子节点
bool HasActiveRecordsUnsafe() const;
bool TryRemoveChildNode(const std::string& name);
private:
uint32_t GetBucketIndex(const std::string& resource_name);
uint32_t depth_;
std::atomic active_bucket_count_{0};
std::vector buckets_;
std::shared_mutex lock_; // 使用标准库读写锁
std::unordered_map sub_nodes_;
};
class ResourceManager {
public:
ResourceManager(std::string prefix_type1, std::string prefix_type2);
std::pair CheckOverlap(const std::string& resource_path);
Result RegisterNewOperation(const std::shared_ptr& operation);
Result CompleteOperation(const OperationInfo& operation);
std::vector ParseResourcePath(const std::string& resource_path);
private:
std::string prefix_type1_; // 第一类资源前缀
std::string prefix_type2_; // 第二类资源前缀
std::shared_ptr root_node_;
};
// 实现文件部分
uint32_t ResourceNode::GetBucketIndex(const std::string& resource_name) {
return std::hash{}(resource_name.at(0)) % CONFIG_RESOURCE_BUCKETS_NUM;
}
std::pair ResourceTracker::CheckOverlap(const std::string& resource_name) {
std::shared_lock guard(lock_); // 使用共享锁(读锁)
auto itr = active_records_.find(resource_name);
if (itr == active_records_.end()) {
return {false, {}};
}
return {true, itr->second};
}
Result ResourceTracker::RemoveActiveRecord(const std::string& resource_name) {
std::unique_lock guard(lock_); // 使用独占锁(写锁)
auto itr = active_records_.find(resource_name);
if (itr == active_records_.end()) {
return ERROR_NOT_FOUND;
}
active_records_.erase(itr);
return active_records_.empty();
}
Result ResourceTracker::AddActiveRecord(const std::string& resource_name,
const OperationContext& context) {
std::unique_lock guard(lock_); // 使用独占锁(写锁)
bool empty_before_insert = active_records_.empty();
auto itr = active_records_.find(resource_name);
if (itr != active_records_.end() && itr->second.operation_id != context.operation_id) {
LOG(ERROR) depth_) {
std::shared_ptr child_node;
{
std::unique_lock guard(lock_);
auto itr = sub_nodes_.find(path_components[depth_]);
if (itr == sub_nodes_.end()) {
auto ret = sub_nodes_.emplace(path_components[depth_],
std::make_shared(depth_ + 1));
CHECK(ret.second);
child_node = ret.first->second;
} else {
child_node = itr->second;
}
}
return child_node->AddActiveRecord(path_components, context);
}
// max_depth == depth_
uint32_t bucket_index = GetBucketIndex(path_components[depth_]);
auto add_ret = buckets_[bucket_index]->AddActiveRecord(path_components[depth_], context);
if (add_ret.IsOk() && add_ret.Get()) {
active_bucket_count_.fetch_add(1, std::memory_order_release);
}
return add_ret;
}
bool ResourceNode::HasActiveRecordsUnsafe() const {
return active_bucket_count_.load(std::memory_order_acquire) > 0 || !sub_nodes_.empty();
}
bool ResourceNode::TryRemoveChildNode(const std::string& name) {
std::unique_lock guard(lock_);
auto itr = sub_nodes_.find(name);
if (itr == sub_nodes_.end()) {
return !HasActiveRecordsUnsafe();
}
if (itr->second->HasActiveRecordsUnsafe()) {
return false;
}
sub_nodes_.erase(itr);
return !HasActiveRecordsUnsafe();
}
Result ResourceNode::RemoveActiveRecord(const std::vector& path_components) {
if (path_components.empty()) {
return ERROR_INVALID_ARGUMENT;
}
if (path_components.size() - 1 > depth_) {
std::unique_ptr child_node;
{
std::shared_lock guard(lock_); // 使用共享锁(读锁)
auto itr = sub_nodes_.find(path_components[depth_]);
if (itr == sub_nodes_.end()) {
return ERROR_NOT_FOUND;
} else {
child_node = itr->second;
}
}
auto del_ret = child_node->RemoveActiveRecord(path_components);
if (del_ret.IsOk() && del_ret.Get()) {
return TryRemoveChildNode(path_components[depth_]);
}
return del_ret;
} else if (path_components.size() - 1 == depth_) {
auto del_ret = buckets_[GetBucketIndex(path_components[depth_])]->RemoveActiveRecord(
path_components[depth_]);
if (del_ret.IsOk()) {
active_bucket_count_.fetch_sub(1, std::memory_order_release);
if (del_ret.Get()) {
return TryRemoveChildNode(path_components[depth_]);
}
return del_ret;
}
return del_ret;
}
return ERROR_INTERNAL;
}
Предположим, что A/b/d/E уже добавлен через Addactiverecord
В момент времени t1, Thread1 Addactiverecord, попытка добавить/b/c и достигнуто Aut -ard -ard_ret od -let_ret leptor leptor leptor leptor leptor leptor a add_ret Buckets_ [bucket_index]-> addactiverecord (path_components [devin_], context);, но на самом деле не выполнил addactiverecord
в момент времени t2, thread2 вызывает removauctiverecord a/b/d/e и уже достиг tryRemovechildnode максимального Node, then o There-sub_nodes_. T3, Thread1 Наконец -то выполняет addactiverecord
Таким образом, новый ресурсов, добавленный Thread1, будет удален с помощью Thread2. Как эта проблема может быть решена?
Подробнее здесь: https://stackoverflow.com/questions/795 ... ffectively
Мобильная версия