У меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)
Process finished with exit code 1
Насколько я понимаю, эта проблема связана с тем, что формат HDF5 пытается сериализовать слой или параметр конфигурации, который он не распознает или не может обработать.
Моя версия Tensorflow — 2.17.0
Код для минимально воспроизводимого примера
Слой кодировщика
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional
class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate
self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')
self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')
def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)
# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)
# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)
# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2
return encoder_output, final_state_h, final_state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)
Слой декодера
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any
class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate
# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')
self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')
self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')
self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')
# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)
# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')
def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs
if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')
# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)
# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)
# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)
# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)
# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)
# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask
# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)
# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)
# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)
return decoder_output
@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None
def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
Уровень внимания
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union
class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True
def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs
# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)
# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)
# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9
attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)
# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)
return context_vector, attention_weights
@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None
def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)
Модель Seq2Seq
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
# Save the number of units (neurons)
self.units: int = units
# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
# Save the dropout rate
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape
# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)
# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.
Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.
Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs
# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)
# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
Минимальный воспроизводимый пример сценария
#!/usr/bin/env python3
import numpy as np
from tensorflow.keras.optimizers import Adam
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)
model.build([encoder_input_shape, decoder_input_shape])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
Подробнее здесь: https://stackoverflow.com/questions/790 ... m-keras-mo
`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras ⇐ Python
Программы на Python
1727526762
Anonymous
У меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)
Process finished with exit code 1
Насколько я понимаю, эта проблема связана с тем, что формат HDF5 пытается сериализовать слой или параметр конфигурации, который он не распознает или не может обработать.
Моя версия Tensorflow — 2.17.0
Код для минимально воспроизводимого примера
Слой кодировщика
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional
class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate
self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')
self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')
def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)
# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)
# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)
# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2
return encoder_output, final_state_h, final_state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)
Слой декодера
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any
class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate
# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')
self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')
self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')
self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')
# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)
# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')
def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs
if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')
# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)
# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)
# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)
# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)
# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)
# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask
# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)
# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)
# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)
return decoder_output
@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None
def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
Уровень внимания
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union
class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True
def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs
# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)
# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)
# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9
attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)
# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)
return context_vector, attention_weights
@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None
def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)
Модель Seq2Seq
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
# Save the number of units (neurons)
self.units: int = units
# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
# Save the dropout rate
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape
# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)
# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.
Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.
Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs
# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)
# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
Минимальный воспроизводимый пример сценария
#!/usr/bin/env python3
import numpy as np
from tensorflow.keras.optimizers import Adam
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)
model.build([encoder_input_shape, decoder_input_shape])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
Подробнее здесь: [url]https://stackoverflow.com/questions/79032646/typeerror-unsupported-integer-size-0-when-attempted-to-save-custom-keras-mo[/url]
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
`TypeError: Неподдерживаемый целочисленный размер (0)` Ошибка при попытке сохранить пользовательскую модель Keras через
Anonymous » » в форуме PythonУ меня есть следующая подклассовая модель tensorflow.keras.Model Seq2Seq с настраиваемыми слоями:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint,... - 0 Ответы
- 15 Просмотры
-
Последнее сообщение Anonymous
-
-
-
`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras
Anonymous » » в форуме PythonУ меня есть следующая подклассовая модель tensorflow.keras.Model Seq2Seq с настраиваемыми слоями:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint,... - 0 Ответы
- 8 Просмотры
-
Последнее сообщение Anonymous
-
-
-
`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras
Anonymous » » в форуме PythonУ меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File... - 0 Ответы
- 20 Просмотры
-
Последнее сообщение Anonymous
-
-
-
`TypeError: Неподдерживаемый целочисленный размер (0)` при попытке сохранить пользовательскую модель Keras
Anonymous » » в форуме PythonУ меня есть подкласс модели tensorflow.keras.Model Seq2Seq с настраиваемыми слоями. Однако когда я пытаюсь сохранить его с помощью метода tensorflow.keras.Model.save(), выдается следующая ошибка:
File... - 0 Ответы
- 16 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Подкласс Keras Model HDF5 (.keras). Ошибка сохранения формата при использовании tf.keras.Model.save() — TypeError: непод
Anonymous » » в форуме PythonПроблема
У меня есть подкласс модели tensorflow.keras.Model Seq2Seq с пользовательскими слоями, которая выдает следующую ошибку, когда я пытаюсь сохранить ее через tensorflow.keras.Model .save():
Traceback (most recent call last):
File... - 0 Ответы
- 51 Просмотры
-
Последнее сообщение Anonymous
-
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...