Поэтому я получаю подобное предупреждение в GCP Dataflow.
Код: Выделить всё
"Using fallback deterministic coder for type ''
in 'Run Pipeline/Select latest per Key/CombinePerKey(LatestCombineFn)/GroupByKey'. "
(Python) Детерминированный запасной кодировщик для сложных типов, таких как NamedTuple, Enum и классов данных, теперь использует CloudPickle вместо dill. Если ваш конвейер затронут, вы можете увидеть предупреждение типа: «Использование резервного детерминированного кодера для типа X…». Вы можете вернуться к предыдущему поведению, используя параметр конвейера --update_compatibility_version=2.67.0 (35725). Сообщайте о любых проблемах, связанных с травлением, по номеру #34903
Они предлагают передать параметр --update_compatibility_version=2.67.0 в задание потока данных.
Но добавление этого параметра в задание потока данных не скрывает предупреждение!!!
Я не уверен, почему это происходит, но мне хотелось бы знать почему.
Самое главное: я хочу знать, как правильно с этим справиться.
Ответственная часть кода выглядит так.
Код: Выделить всё
>> beam.WithKeys(lambda rec: rec.key).with_output_types((Tuple[MessageKey, Message]))
| "Window Input" >> beam.WindowInto(window.FixedWindows(60))
| "Select latest per Key" >> beam.combiners.Latest.PerKey() # > beam.Values()
Код: Выделить всё
@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V])
class PerKey(ptransform.PTransform):
...
Код: Выделить всё
MessageKeyКод: Выделить всё
class MessageKey(BaseModel):
...
Код: Выделить всё
class MessageKeyCoder(Coder):
def encode(self, value: MessageKey) -> bytes:
return json.dumps(value.model_dump(), sort_keys=True).encode("utf-8")
def decode(self, encoded: bytes) -> MessageKey:
data = json.loads(encoded.decode("utf-8"))
return MessageKey(**data)
def is_deterministic(self) -> bool:
return True
def estimate_size(self, value: MessageKey) -> int:
return len(self.encode(value))
Код: Выделить всё
beam.coders.registry.register_coder(MessageKey, MessageKeyCoder)
- Добавлено в тот же файл, что и MessageKeyCoder, чуть ниже него.
- Добавлено сразу после всех операций импорта в файле, где определен конвейер.
- Добавлено внутри контекстного менеджера с помощью Pipeline(...) как p.
Подробнее здесь: https://stackoverflow.com/questions/797 ... pe-warning