Планировщик:
Код: Выделить всё
nohup ray start --head \
--node-ip-address= \
--port=6379 \
--resources='{"is_worker": 1}'
--dashboard-host=localhost &
Код: Выделить всё
nohup ray start \
--address=':6379' \
--resources='{"is_worker": 1}'
--node-ip-address= &
Код: Выделить всё
class DynamicPoolManager:
def __init__(self, actor_per_cpu, mongouri, embeddings):
self.workers = []
self.pool = None
self.mongouri = mongouri
self.embeddings = embeddings
self.actor_per_cpu = actor_per_cpu
self.scale() # Initial scale
def scale(self):
# 1. Clean up dead actors first
# self.workers = [w for w in self.workers if self._is_alive(w)]
# 2. Get current capacity
# Use 'AvailableResources' if you want to be polite to other apps,
# or 'cluster_resources' to claim the whole cluster.
total_cpus = int(ray.available_resources().get("CPU", 0))
# Get the ID of the node the scheduler is currently running on
current_node_id = ray.get_runtime_context().get_node_id()
# Find out how many CPUs are on THIS specific node
nodes = ray.nodes()
scheduler_node_cpus = 0
for node in nodes:
if node["NodeID"] == current_node_id:
scheduler_node_cpus = node["Resources"].get("CPU", 0)
break
# Your actual target is the cluster total MINUS the head node's CPUs
# total_cpus -= int(scheduler_node_cpus)
num_actors = int(total_cpus * self.actor_per_cpu)
# 3. Scale Up
if len(self.workers) < num_actors:
new_count = num_actors - len(self.workers)
log(f"Resources added. Creating {new_count} actors")
self.workers.extend([worker.Actor.remote(self.mongouri, self.embeddings) for _ in range(new_count)])
# 4. Scale Down (Crucial for Autoscaler)
elif len(self.workers) > num_actors:
to_remove = len(self.workers) - num_actors
log(f"Resources removed. Removing {to_remove} actors")
for _ in range(to_remove):
victim = self.workers.pop()
victim.shutdown.remote()
# 5. Update the pool reference
self.pool = ActorPool(self.workers)
Код: Выделить всё
@ray.remote(resources={"is_worker": 1})
class Actor:
def __init__(self, mongo_uri, embeddings):
# INIT LOGIC
log("Actor ready")
def run():
# TASK
без is_worker в командах Actor и ray start, это то же самое, головному узлу не назначаются никакие задачи/актеры.
Подробнее здесь: https://stackoverflow.com/questions/798 ... sks-actors
Мобильная версия