Распределенные вычисления с использованием Ray/Python вообще не имеют параллелизма в GCP.Python

Программы на Python
Ответить
Anonymous
 Распределенные вычисления с использованием Ray/Python вообще не имеют параллелизма в GCP.

Сообщение Anonymous »

Я пытаюсь настроить распределенную среду для обработки НЛП. Я использую Ray и Python в GCP. У меня есть хозяин и несколько рабочих. Происходит следующее: когда я запускаю 1 или 8 рабочих, для получения 10 результатов требуется одинаковое количество времени. Никакого параллелизма вообще нет. Я боролся с этим уже несколько часов, но так и не понял.
Вот мой код планировщика:
import ray
from pymongo import MongoClient
import worker # your worker.py will define the Ray actor
import logging, threading, time
from datetime import datetime

def start_orchestration():
logging.info("--- Starting orchestration ---")
ray.init(address="auto") # connect to Ray cluster (or just ray.init() locally)

cpus_per_actor = 2
nlp_actors = []

def scale_actors():
resources = ray.cluster_resources()
total_cpus = int(resources.get("CPU", 0))
desired = total_cpus // cpus_per_actor

print(f"Desired actors={desired}, current={len(nlp_actors)}")

# Add actors if needed
while len(nlp_actors) < desired:
new_actor = worker.NLPActor.options(num_cpus=cpus_per_actor).remote(
"gs://babyllm-data/embeddings.pkl"
)
nlp_actors.append(new_actor)

# Remove actors if cluster shrank
while len(nlp_actors) > desired:
actor_to_remove = nlp_actors.pop()
ray.kill(actor_to_remove)

def scaler_loop(interval=30):
while True:
try:
scale_actors()
except Exception as e:
logging.error(f"[Scaler] Error {e}")
time.sleep(interval)

# Initial scale
scale_actors()

# Run scaler in background
threading.Thread(target=scaler_loop, daemon=True).start()

logging.info("--- NLPActor started ---")

# Mongo Connection
mongo = MongoClient("mongodb+srv://....../")
col = mongo["BabyLLM"]["simple_texts_wiki_v2"]

MAX_IN_FLIGHT = len(nlp_actors) * 2

logging.info("--- Processing Docs ---")
active_futures = []
cursor = col.find({"text_l1": {"$exists": False}})#{"text_mini": {"$exists": False}})
counter = 0

start_time = datetime.now()
for doc in cursor:
# 1. Submit the task and keep moving
actor = nlp_actors[counter % len(nlp_actors)]
f = actor.extract_and_replace.remote(doc["_id"], doc["title"], doc["text"][:1000])
active_futures.append(f)

# 2. Only wait once we've filled our "pipeline" buffer
if len(active_futures) >= MAX_IN_FLIGHT:
# Get whichever task finishes FIRST (not necessarily the one we just sent)
done, active_futures = ray.wait(active_futures, num_returns=1)

for finished in done:
try:
res = ray.get(finished)
if res:
# Save immediately as requested
# col.update_one(
# {"_id": res["_id"]},
# {"$set": {
# "text_l1": res["text_mini_l1"]
# # "text_mini": res["text_mini"],
# # "text_mini_l1": res["text_mini_l1"]
# }}
# )
counter += 1
if counter % 10 == 0:
print(f"Saved {counter} documents total. Took {(datetime.now() - start_time).total_seconds()} seconds.")
start_time = datetime.now()
except Exception as e:
logging.error(f"Task failed: {e}")

# 3. Clean up the final tasks left in the pipeline
while active_futures:
done, active_futures = ray.wait(active_futures, num_returns=1)
for finished in done:
res = ray.get(finished)
if res:
col.update_one({"_id": res["_id"]}, {"$set": {"text_mini": res["text_mini"]}})
counter += 1

logging.info("--- Job Complete ---")

if __name__ == "__main__":
start_orchestration()


Подробнее здесь: https://stackoverflow.com/questions/798 ... all-on-gcp
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»