LSTM AE прогнозирует постоянный выходPython

Программы на Python
Anonymous
 LSTM AE прогнозирует постоянный выход

Сообщение Anonymous »

Я делаю LSTM AE, чтобы предсказать HR в велосипедных тренировках. Он принимает 6 измерений датчиков (ЧСС, мощность, ...) за шаг времени временного ряда и декодирует это в прогнозируемый HR. Однако есть две проблемы: < /p>

Тренировки не имеют аналогичной длины. Я попытался отредактировать код, чтобы учесть маскировку, но это сделало код невероятно медленным. Любые предложения по этому поводу приветствуются! < /Li>
Вывод является почти постоянным прогнозом. Почему он не пытается имитировать временные ряды? Как я могу это улучшить? : < /p>

DataSet = 443 тренировки с 1,593,961 Общее время шага < /li>
Размер партии = 128 < /li> Epochs = 2500 (с ранним терпением 5, часто вызываемым около 10 эпох) < /li>
Оптимизатор = Адам с скоростью обучения 0,001 < /li>
скрытый размер = 128 < /li>
< /ul>
форма ввода: (#num тренировки, #max тренировочная длина, #sensors) < /p>
< P> код для LSTM AE: < /p>
import torch.nn as nn

# Encoder Class
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size, dropout, seq_len):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.dropout = dropout
self.seq_len = seq_len

self.lstm_enc = nn.LSTM(input_size=input_size, hidden_size=hidden_size, dropout=dropout, batch_first=True)

def forward(self, x):
out, (last_h_state, last_c_state) = self.lstm_enc(x)
x_enc = last_h_state.squeeze(dim=0)
x_enc = x_enc.unsqueeze(1).repeat(1, x.shape[1], 1)
return x_enc, out

# Decoder Class
class Decoder(nn.Module):
def __init__(self, input_size, hidden_size, dropout, seq_len, use_act):
super(Decoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.dropout = dropout
self.seq_len = seq_len
self.use_act = use_act # Parameter to control the last sigmoid activation - depends on the normalization used.
self.act = nn.Sigmoid()

self.lstm_dec = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size, 1)

def forward(self, z):
# z = z.unsqueeze(1).repeat(1, self.seq_len, 1)
dec_out, (hidden_state, cell_state) = self.lstm_dec(z)
dec_out = self.fc(dec_out)
if self.use_act:
dec_out = self.act(dec_out)

return dec_out, hidden_state

# LSTM Auto-Encoder Class
class LSTMAE(nn.Module):
def __init__(self, input_size, hidden_size, dropout_ratio, seq_len, use_act=True):
super(LSTMAE, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.dropout_ratio = dropout_ratio
self.seq_len = seq_len

self.encoder = Encoder(input_size=input_size, hidden_size=hidden_size, dropout=dropout_ratio, seq_len=seq_len)
self.decoder = Decoder(input_size=input_size, hidden_size=hidden_size, dropout=dropout_ratio, seq_len=seq_len, use_act=use_act)

def forward(self, x, return_last_h=False, return_enc_out=False):
x_enc, enc_out = self.encoder(x)
x_dec, last_h = self.decoder(x_enc)

if return_last_h:
return x_dec, last_h
elif return_enc_out:
return x_dec, enc_out
return x_dec
< /code>
код для обучения и оценки: < /p>
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_model(criterion, epoch, model, model_type, optimizer, train_iter, batch_size, clip_val, log_interval, scheduler=None):
"""
Function to run training epoch
"""
model.train()
loss_sum = 0
pred_loss_sum = 0
correct_sum = 0

num_samples_iter = 0
for batch_idx, data in enumerate(train_iter, 1):
if len(data) == 2:
data, labels = data[0].float().to(device), data[1].float().to(device)
else:
data = data.float().to(device)
# Zero gradients
optimizer.zero_grad()

num_samples_iter += len(data)

target = data[:, :, 0] # Shape: (batch_size, seq_len)

# Forward pass & loss calculation
model_out = model(data)
model_out = model_out.squeeze(-1) # Remove the last dimension: (batch_size, seq_len)
loss = criterion(model_out, target)

# Backward pass
loss.backward()
loss_sum += loss.item()

# Gradient clipping
if clip_val is not None:
torch.nn.utils.clip_grad_norm_(model.parameters(), clip_val)

# Update model params
optimizer.step()

# LR scheduler step
if scheduler is not None:
scheduler.step()

# Print progress
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, num_samples_iter, len(train_iter.dataset),
100. * num_samples_iter / len(train_iter.dataset), loss_sum / num_samples_iter))

train_loss = loss_sum / len(train_iter.dataset)
train_pred_loss = pred_loss_sum / len(train_iter.dataset)
train_acc = round(correct_sum / len(train_iter.dataset) * 100, 2)
print(f'Train Average Loss: {train_loss}')

return train_loss, train_acc, train_pred_loss

def eval_model(criterion, model, model_type, val_iter, mode='Validation'):
"""
Function to run validation on given model
"""
model.eval()
loss_sum = 0
correct_sum = 0
with torch.no_grad():
for data in val_iter:
if len(data) == 2:
data, labels = data[0].float().to(device), data[1].float().to(device)
else:
data = data.float().to(device)

target = data[:, :, 0] # Shape: (batch_size, seq_len)

# Forward pass & loss calculation
model_out = model(data)
model_out = model_out.squeeze(-1) # Remove the last dimension: (batch_size, seq_len)
loss = criterion(model_out, target)

loss_sum += loss.item()

val_loss = loss_sum / len(val_iter.dataset)
val_acc = round(correct_sum / len(val_iter.dataset) * 100, 2)
print(f'{mode}: Average Loss: {val_loss}')
return val_loss, val_acc



Подробнее здесь: https://stackoverflow.com/questions/793 ... ant-output

Вернуться в «Python»