Проблема с тензорами pytorch и несколькими графическими процессорами при использовании DataParallelPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблема с тензорами pytorch и несколькими графическими процессорами при использовании DataParallel

Сообщение Anonymous »

У меня есть большой код машинного обучения, который я пишу в течение нескольких месяцев, и я начал процесс распараллеливания данных для работы с несколькими графическими процессорами. Начнем с того, что код отлично работает при использовании одного графического процессора; проблема возникает при использовании нескольких графических процессоров.
Ошибка следующая: RuntimeError: Ожидалось, что все тензоры будут на одном устройстве, но обнаружено как минимум два устройства: cuda:1 и cuda: 0! (при проверке аргумента для индекса аргумента в методе Wrapper_CUDA__index_select)
Фрагменты соответствующего кода можно найти ниже:
Модель файл

Код: Выделить всё

import torch
from torch import nn
from functools import partial
import copy

from ..mlp import MLP
from ..basis import gaussian, bessel
from ..conv import GatedGCN

class Encoder(nn.Module):
"""ALIGNN/ALIGNN-d Encoder.
The encoder must take a PyG graph object `data` and output the same `data`
with additional fields `h_atm`, `h_bnd`, and `h_ang` that correspond to the atom, bond, and angle embedding.

The input `data` must have three fields `x_atm`, `x_bnd`, and `x_ang` that describe the atom type
(in onehot vectors), the bond lengths, and bond/dihedral angles (in radians).
"""

def __init__(self, num_species, cutoff, dim=128, dihedral=False):
super().__init__()
self.num_species = num_species
self.cutoff = cutoff
self.dim = dim
self.dihedral = dihedral

self.embed_atm = nn.Sequential(MLP([num_species, dim, dim], act=nn.SiLU()), nn.LayerNorm(dim))
self.embed_bnd = partial(bessel, start=0, end=cutoff, num_basis=dim)
self.embed_ang = self.embed_ang_with_dihedral if dihedral else self.embed_ang_without_dihedral

def embed_ang_with_dihedral(self, x_ang, mask_dih_ang):
cos_ang = torch.cos(x_ang)
sin_ang = torch.sin(x_ang)

h_ang = torch.zeros([len(x_ang), self.dim], device=x_ang.device)
h_ang[~mask_dih_ang, :self.dim // 2] = gaussian(cos_ang[~mask_dih_ang], start=-1, end=1,
num_basis=self.dim // 2)

h_cos_ang = gaussian(cos_ang[mask_dih_ang], start=-1, end=1, num_basis=self.dim // 4)
h_sin_ang = gaussian(sin_ang[mask_dih_ang], start=-1, end=1, num_basis=self.dim // 4)
h_ang[mask_dih_ang, self.dim // 2:] = torch.cat([h_cos_ang, h_sin_ang], dim=-1)

return h_ang

def embed_ang_without_dihedral(self, x_ang, mask_dih_ang):
cos_ang = torch.cos(x_ang)
return gaussian(cos_ang, start=-1, end=1, num_basis=self.dim)

def forward(self, data):
# Embed atoms
data.h_atm = self.embed_atm(data.x_atm)

# Embed bonds
data.h_bnd = self.embed_bnd(data.x_bnd)

# Embed angles
data.h_ang = self.embed_ang(data.x_ang, data.mask_dih_ang)

return data

class Processor(nn.Module):
"""ALIGNN Processor.
The processor updates atom, bond, and angle embeddings.
"""

def __init__(self, num_convs, dim):
super().__init__()
self.num_convs = num_convs
self.dim = dim

self.atm_bnd_convs = nn.ModuleList([copy.deepcopy(GatedGCN(dim, dim)) for _ in range(num_convs)])
self.bnd_ang_convs = nn.ModuleList([copy.deepcopy(GatedGCN(dim, dim)) for _ in range(num_convs)])

def forward(self, data):
edge_index_G = data.edge_index_G
edge_index_A = data.edge_index_A

for i in range(self.num_convs):
data.h_bnd, data.h_ang = self.bnd_ang_convs[i](data.h_bnd, edge_index_A, data.h_ang)
data.h_atm, data.h_bnd = self.atm_bnd_convs[i](data.h_atm, edge_index_G, data.h_bnd)

return data

class Decoder(nn.Module):
def __init__(self, node_dim, out_dim):
super().__init__()
self.node_dim = node_dim
self.out_dim = out_dim
self.decoder = MLP([node_dim, node_dim, out_dim], act=nn.SiLU())

def forward(self, data):
return self.decoder(data.h_atm)

class ALIGNN(nn.Module):
"""ALIGNN model.
Can optinally encode dihedral angles.
"""

def __init__(self, encoder, processor, decoder):
super().__init__()
self.encoder = encoder
self.processor = processor
self.decoder = decoder

def forward(self, data):
data = self.encoder(data)
data = self.processor(data)
return self.decoder(data)
Обучающий файл

Код: Выделить всё

from tqdm.notebook import trange
from datetime import datetime
import glob
import sys
import os

def train(loader,model,parameters,PIN_MEMORY=False):
model.train()
total_loss = 0.0
model = nn.DataParallel(model, device_ids=[0,  1]).cuda()
#model = model.to(parameters['device'])
optimizer = torch.optim.AdamW(model.module.processor.parameters(), lr=parameters['LEARN_RATE'])

#model = model.to(parameters['device'])
loss_fn = torch.nn.MSELoss()
for i,data in enumerate(loader, 0):
optimizer.zero_grad(set_to_none=True)
#data = data.to(parameters['device'], non_blocking=PIN_MEMORY)
data = data.cuda()
#encoding = model.encoder(data)
#proc = model.processor(encoding.module)
#atom_contrib, bond_contrib, angle_contrib = model.decoder(proc.module)
atom_contrib, bond_contrib, angle_contrib = model(data)

all_sum = atom_contrib.sum() + bond_contrib.sum() + angle_contrib.sum()

loss = loss_fn(all_sum, data.y[0][0])
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(loader)

def run_training(data,parameters,model):
follow_batch = ['x_atm', 'x_bnd', 'x_ang'] if hasattr(data['training'][0], 'x_ang') else ['x_atm']
loader_train = DataLoader(data['training'], batch_size=parameters['BATCH_SIZE'], shuffle=True, follow_batch=follow_batch)
loader_valid = DataLoader(data['validation'], batch_size=parameters['BATCH_SIZE'], shuffle=False)

L_train, L_valid = [], []
min_loss_train = 1.0E30
min_loss_valid = 1.0E30

stats_file = open(os.path.join(parameters['model_dir'],'loss.data'),'w')
stats_file.write('Training_loss     Validation loss\n')
stats_file.close()
for ep in range(parameters['num_epochs']):
stats_file = open(os.path.join(parameters['model_dir'], 'loss.data'), 'a')
print('Epoch ',ep,' of ',parameters['num_epochs'])
sys.stdout.flush()
loss_train = train(loader_train, model, parameters);
L_train.append(loss_train)
loss_valid = test_non_intepretable(loader_valid, model, parameters)
L_valid.append(loss_valid)
stats_file.write(str(loss_train) + '     ' + str(loss_valid) + '\n')
if loss_train < min_loss_train:
min_loss_train = loss_train
if loss_valid < min_loss_valid:
min_loss_valid = loss_valid
if parameters['remove_old_model']:
model_name = glob.glob(os.path.join(parameters['model_dir'], 'model_*'))
if len(model_name) > 0:
os.remove(model_name[0])
now = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
print('Min train loss: ', min_loss_train, ' min valid loss: ', min_loss_valid, ' time: ', now)
torch.save(model.state_dict(), os.path.join(parameters['model_dir'], 'model_' + str(now)))
stats_file.close()
if loss_train < parameters['train_tolerance'] and loss_valid < parameters['train_tolerance']:
print('Validation and training losses satisy set tolerance...exiting training loop...')
break
Существует много других файлов, но я думаю, что они подходят для этой конкретной проблемы, но я буду рад включить их в код, если это необходимо. Мои данные хранятся в виде графика и группируются с помощью DataLoader и выглядят следующим образом (при пакетировании по два за раз):

Код: Выделить всё

Graph_DataBatch(atoms=[2], edge_index_G=[2, 89966], edge_index_A=[2, 1479258], x_atm=[5184, 5], x_atm_batch=[5184], x_atm_ptr=[3], x_bnd=[89966], x_bnd_batch=[89966], x_bnd_ptr=[3], x_ang=[1479258], x_ang_batch=[1479258], x_ang_ptr=[3], mask_dih_ang=[1479258], atm_amounts=[6], bnd_amounts=[6], ang_amounts=[6], y=[179932, 1])
Проблемная строка находится в кодировщике во время его функции пересылки:

Код: Выделить всё

def forward(self, data):
# Embed atoms
data.h_atm = self.embed_atm(data.x_atm)

# Embed bonds
data.h_bnd = self.embed_bnd(data.x_bnd)

# Embed angles
data.h_ang = self.embed_ang(data.x_ang, data.mask_dih_ang)

return data
В частности, data.h_atm = self.embed_atm(data.x_atm). Чтобы кратко объяснить, что я пытаюсь сделать, я загружаю в Dataloader набор графиков, которые группируются, а затем передаются в модель для обучения:

Код: Выделить всё

def train(loader,model,parameters,PIN_MEMORY=False):
model.train()
total_loss = 0.0
model = nn.DataParallel(model, device_ids=[0, 1]).cuda()
#model = model.to(parameters['device'])
optimizer = torch.optim.AdamW(model.module.processor.parameters(), lr=parameters['LEARN_RATE'])

#model = model.to(parameters['device'])
loss_fn = torch.nn.MSELoss()
for i,data in enumerate(loader, 0):
optimizer.zero_grad(set_to_none=True)
#data = data.to(parameters['device'], non_blocking=PIN_MEMORY)
data = data.cuda()
#encoding = model.encoder(data)
#proc = model.processor(encoding.module)
#atom_contrib, bond_contrib, angle_contrib = model.decoder(proc.module)
atom_contrib, bond_contrib, angle_contrib = model(data)
Насколько я понимаю, я отправил пакетные данные в графический процессор, а параметры моей модели находятся в графическом процессоре, и DataParallel должен позаботиться о разделении моих данных и автоматической отправке всех данных на каждый графический процессор.
Мой вопрос можно разбить на несколько частей: (1) Верно ли это понимание? (2) Действительно ли мой код выглядит так, как будто он делает это, и (3) имеет ли эта ошибка какое-либо отношение к этому, и если нет, то о чем эта ошибка пытается мне сказать? Я не ожидаю, что кто-нибудь исправит мой код за меня, но мне хотелось бы понять, почему возникает эта ошибка, потому что я думаю, что неправильно понимаю основную логику того, как DataParallel принимает мои данные и отправляет их в графический процессор. Я рад предоставить любую информацию, которая может вам понадобиться, чтобы лучше понять эту проблему.
Я попытался лучше понять разрывающуюся строку: data.h_atm = self.embed_atm(data.x_atm ) распечатав, где на самом деле находится data.x_atm внутри функции пересылки, что должно быть после того, как DataParallel разделил данные, и я получаю это для всех тензоров:

Код: Выделить всё

tensor([[1., 0., 0., 0., 0.],

Код: Выделить всё

[1., 0., 0., 0., 0.],

Код: Выделить всё

[1., 0., 0., 0., 0.],

Код: Выделить всё

[0., 0., 1., 0., 0.],

Код: Выделить всё

[0., 0., 1., 0., 0.],

Код: Выделить всё

[0., 0., 1., 0., 0.]], device='cuda:0')
Я думаю, это говорит мне о том, что все мои данные находятся на графическом процессоре 0, несмотря на то, что модель находится как на графическом процессоре 0, так и на 1 (при работе на двух графических процессорах), что я и делаю. Я подтвердил это, используя nvidia-smi и наблюдая, как оба графических процессора потребляют примерно половину памяти. Я также пробовал различные комбинации вызова X.to_device('cuda') или X.cuda(), где X - данные моего графика, но, похоже, ничто не имеет никакого значения для тензорной распечатки. .

Подробнее здесь: https://stackoverflow.com/questions/787 ... taparallel
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»