`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель KerasPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 `TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras

Сообщение Anonymous »

У меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)

Process finished with exit code 1

Насколько я понимаю, эта проблема связана с тем, что формат HDF5 пытается сериализовать слой или параметр конфигурации, который он не распознает или не может обработать.
Моя версия Tensorflow — 2.17.0
Код для минимально воспроизводимого примера
Слой кодировщика
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional

class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate

self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)

self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')

self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)

self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')

def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)

# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)

# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)

# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)

# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2

return encoder_output, final_state_h, final_state_c

def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)

def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)

Слой декодера
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any

class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate

# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')

self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')

self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')

self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')

# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)

# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')

def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs

if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')

# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)

# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)

# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)

# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)

# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)

# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask

# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)

# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)

# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)

return decoder_output

@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None

def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)

Уровень внимания
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union

class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True

def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs

# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)

# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)

# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9

attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)

# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)

return context_vector, attention_weights

@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None

def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config

@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)

Модель Seq2Seq
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple

class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)

# Save the number of units (neurons)
self.units: int = units

# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)

# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)

# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size

# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')

# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None

# Save the dropout rate
self.dropout_rate: float = dropout_rate

def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape

# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)

# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)

# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)

def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.

Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.

Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs

# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)

# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)

# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)

# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)

# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)

return output

def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config

@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)

Минимальный воспроизводимый пример сценария
#!/usr/bin/env python3

import numpy as np
from tensorflow.keras.optimizers import Adam

input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2

model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)

encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)

model.build([encoder_input_shape, decoder_input_shape])

sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))

learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)

model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")


Подробнее здесь: https://stackoverflow.com/questions/790 ... m-keras-mo
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»