Я пытаюсь настроить распределенную среду для обработки НЛП. Я использую Ray и Python в GCP. У меня есть хозяин и несколько рабочих. Происходит следующее: когда я запускаю 1 или 8 рабочих, для получения 10 результатов требуется одинаковое количество времени. Никакого параллелизма вообще нет. Я боролся с этим уже несколько часов, но так и не понял.
Вот мой код планировщика:
import ray
from pymongo import MongoClient
import worker # your worker.py will define the Ray actor
import logging, threading, time
from datetime import datetime
def start_orchestration():
logging.info("--- Starting orchestration ---")
ray.init(address="auto") # connect to Ray cluster (or just ray.init() locally)
cpus_per_actor = 2
nlp_actors = []
def scale_actors():
resources = ray.cluster_resources()
total_cpus = int(resources.get("CPU", 0))
desired = total_cpus // cpus_per_actor
print(f"Desired actors={desired}, current={len(nlp_actors)}")
# Add actors if needed
while len(nlp_actors) < desired:
new_actor = worker.NLPActor.options(num_cpus=cpus_per_actor).remote(
"gs://babyllm-data/embeddings.pkl"
)
nlp_actors.append(new_actor)
# Remove actors if cluster shrank
while len(nlp_actors) > desired:
actor_to_remove = nlp_actors.pop()
ray.kill(actor_to_remove)
def scaler_loop(interval=30):
while True:
try:
scale_actors()
except Exception as e:
logging.error(f"[Scaler] Error {e}")
time.sleep(interval)
# Initial scale
scale_actors()
# Run scaler in background
threading.Thread(target=scaler_loop, daemon=True).start()
logging.info("--- NLPActor started ---")
# Mongo Connection
mongo = MongoClient("mongodb+srv://....../")
col = mongo["BabyLLM"]["simple_texts_wiki_v2"]
MAX_IN_FLIGHT = len(nlp_actors) * 2
logging.info("--- Processing Docs ---")
active_futures = []
cursor = col.find({"text_l1": {"$exists": False}})#{"text_mini": {"$exists": False}})
counter = 0
start_time = datetime.now()
for doc in cursor:
# 1. Submit the task and keep moving
actor = nlp_actors[counter % len(nlp_actors)]
f = actor.extract_and_replace.remote(doc["_id"], doc["title"], doc["text"][:1000])
active_futures.append(f)
# 2. Only wait once we've filled our "pipeline" buffer
if len(active_futures) >= MAX_IN_FLIGHT:
# Get whichever task finishes FIRST (not necessarily the one we just sent)
done, active_futures = ray.wait(active_futures, num_returns=1)
for finished in done:
try:
res = ray.get(finished)
if res:
# Save immediately as requested
# col.update_one(
# {"_id": res["_id"]},
# {"$set": {
# "text_l1": res["text_mini_l1"]
# # "text_mini": res["text_mini"],
# # "text_mini_l1": res["text_mini_l1"]
# }}
# )
counter += 1
if counter % 10 == 0:
print(f"Saved {counter} documents total. Took {(datetime.now() - start_time).total_seconds()} seconds.")
start_time = datetime.now()
except Exception as e:
logging.error(f"Task failed: {e}")
# 3. Clean up the final tasks left in the pipeline
while active_futures:
done, active_futures = ray.wait(active_futures, num_returns=1)
for finished in done:
res = ray.get(finished)
if res:
col.update_one({"_id": res["_id"]}, {"$set": {"text_mini": res["text_mini"]}})
counter += 1
logging.info("--- Job Complete ---")
if __name__ == "__main__":
start_orchestration()
Подробнее здесь: https://stackoverflow.com/questions/798 ... all-on-gcp
Распределенные вычисления с использованием Ray/Python вообще не имеют параллелизма в GCP. ⇐ Python
Программы на Python
1772326555
Anonymous
Я пытаюсь настроить распределенную среду для обработки НЛП. Я использую Ray и Python в GCP. У меня есть хозяин и несколько рабочих. Происходит следующее: когда я запускаю 1 или 8 рабочих, для получения 10 результатов требуется одинаковое количество времени. Никакого параллелизма вообще нет. Я боролся с этим уже несколько часов, но так и не понял.
Вот мой код планировщика:
import ray
from pymongo import MongoClient
import worker # your worker.py will define the Ray actor
import logging, threading, time
from datetime import datetime
def start_orchestration():
logging.info("--- Starting orchestration ---")
ray.init(address="auto") # connect to Ray cluster (or just ray.init() locally)
cpus_per_actor = 2
nlp_actors = []
def scale_actors():
resources = ray.cluster_resources()
total_cpus = int(resources.get("CPU", 0))
desired = total_cpus // cpus_per_actor
print(f"Desired actors={desired}, current={len(nlp_actors)}")
# Add actors if needed
while len(nlp_actors) < desired:
new_actor = worker.NLPActor.options(num_cpus=cpus_per_actor).remote(
"gs://babyllm-data/embeddings.pkl"
)
nlp_actors.append(new_actor)
# Remove actors if cluster shrank
while len(nlp_actors) > desired:
actor_to_remove = nlp_actors.pop()
ray.kill(actor_to_remove)
def scaler_loop(interval=30):
while True:
try:
scale_actors()
except Exception as e:
logging.error(f"[Scaler] Error {e}")
time.sleep(interval)
# Initial scale
scale_actors()
# Run scaler in background
threading.Thread(target=scaler_loop, daemon=True).start()
logging.info("--- NLPActor started ---")
# Mongo Connection
mongo = MongoClient("mongodb+srv://....../")
col = mongo["BabyLLM"]["simple_texts_wiki_v2"]
MAX_IN_FLIGHT = len(nlp_actors) * 2
logging.info("--- Processing Docs ---")
active_futures = []
cursor = col.find({"text_l1": {"$exists": False}})#{"text_mini": {"$exists": False}})
counter = 0
start_time = datetime.now()
for doc in cursor:
# 1. Submit the task and keep moving
actor = nlp_actors[counter % len(nlp_actors)]
f = actor.extract_and_replace.remote(doc["_id"], doc["title"], doc["text"][:1000])
active_futures.append(f)
# 2. Only wait once we've filled our "pipeline" buffer
if len(active_futures) >= MAX_IN_FLIGHT:
# Get whichever task finishes FIRST (not necessarily the one we just sent)
done, active_futures = ray.wait(active_futures, num_returns=1)
for finished in done:
try:
res = ray.get(finished)
if res:
# Save immediately as requested
# col.update_one(
# {"_id": res["_id"]},
# {"$set": {
# "text_l1": res["text_mini_l1"]
# # "text_mini": res["text_mini"],
# # "text_mini_l1": res["text_mini_l1"]
# }}
# )
counter += 1
if counter % 10 == 0:
print(f"Saved {counter} documents total. Took {(datetime.now() - start_time).total_seconds()} seconds.")
start_time = datetime.now()
except Exception as e:
logging.error(f"Task failed: {e}")
# 3. Clean up the final tasks left in the pipeline
while active_futures:
done, active_futures = ray.wait(active_futures, num_returns=1)
for finished in done:
res = ray.get(finished)
if res:
col.update_one({"_id": res["_id"]}, {"$set": {"text_mini": res["text_mini"]}})
counter += 1
logging.info("--- Job Complete ---")
if __name__ == "__main__":
start_orchestration()
Подробнее здесь: [url]https://stackoverflow.com/questions/79898673/distributed-computing-with-ray-python-has-no-parallelism-at-all-on-gcp[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия