`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель KerasPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 `TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras

Сообщение Anonymous »

У меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)

Process finished with exit code 1

Насколько я понимаю, эта проблема связана с тем, что формат HDF5 пытается сериализовать слой или параметр конфигурации, который он не распознает или не может обработать.
< h2>Минимальный воспроизводимый пример сценария
#!/usr/bin/env python3

import numpy as np
from models.seq2seq import RetrosynthesisSeq2SeqModel
from tensorflow.keras.optimizers import Adam

input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2

model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)

encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)

model.build([encoder_input_shape, decoder_input_shape])

sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))

learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)

model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")

Модель Seq2Seq
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from encoders.lstm_encoders import StackedBidirectionalLSTMEncoder
from decoders.lstm_decoders import StackedLSTMDecoder
from typing import Optional, Any, Tuple

class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)

# Save the number of units (neurons)
self.units: int = units

# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)

# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)

# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size

# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')

# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None

# Save the dropout rate
self.dropout_rate: float = dropout_rate

def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape

# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)

# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)

# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)

def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.

Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.

Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs

# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)

# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)

# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)

# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)

# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)

return output

def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config

@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)

Слой кодировщика
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from encoders.encoder_interface import EncoderInterface
from typing import Tuple, Optional

class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate

self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)

self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')

self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)

self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')

def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)

# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)

# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)

# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)

# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2

return encoder_output, final_state_h, final_state_c

def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)

def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)

Слой декодера
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from decoders.decoder_interface import DecoderInterface
from attention.attention import BahdanauAttention
from typing import List, Optional, Tuple, Union, Any

class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate

# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')

self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')

self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')

self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')

# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)

# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')

def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs

if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')

# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)

# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)

# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)

# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)

# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)

# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask

# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)

# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)

# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)

return decoder_output

@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None

def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)

Уровень внимания
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union

class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True

def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs

# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)

# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)

# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9

attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)

# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)

return context_vector, attention_weights

@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None

def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)


Подробнее здесь: https://stackoverflow.com/questions/790 ... m-keras-mo
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»