Apache Beam 2.68.0 выдает предупреждение «Использование резервного детерминированного кодера для типа»Python

Программы на Python
Anonymous
 Apache Beam 2.68.0 выдает предупреждение «Использование резервного детерминированного кодера для типа»

Сообщение Anonymous »

В последней версии Apache Beam 2.68.0 изменено поведение программистов для непримитивных объектов. (см. журнал изменений здесь).
Поэтому я получаю подобное предупреждение в GCP Dataflow.

Код: Выделить всё

"Using fallback deterministic coder for type ''
in 'Run Pipeline/Select latest per Key/CombinePerKey(LatestCombineFn)/GroupByKey'. "
Это предупреждение также явно упоминается в разделе «Критические изменения».

(Python) Детерминированный запасной кодировщик для сложных типов, таких как NamedTuple, Enum и классов данных, теперь использует CloudPickle вместо dill. Если ваш конвейер затронут, вы можете увидеть предупреждение типа: «Использование резервного детерминированного кодера для типа X…». Вы можете вернуться к предыдущему поведению, используя параметр конвейера --update_compatibility_version=2.67.0 (35725). Сообщайте о любых проблемах, связанных с травлением, по номеру #34903

Они предлагают передать параметр --update_compatibility_version=2.67.0 в задание потока данных.
Но добавление этого параметра в задание потока данных не скрывает предупреждение!!!
Я не я уверен, почему это происходит, но мне хотелось бы знать, почему.
Самое главное: Я хочу знать, как правильно с этим справиться.
Ответственная часть кода выглядит так.

Код: Выделить всё

>> beam.WithKeys(lambda rec: rec.key).with_output_types((Tuple[MessageKey, Message]))
| "Window Input" >> beam.WindowInto(window.FixedWindows(60))
| "Select latest per Key" >> beam.combiners.Latest.PerKey() # > beam.Values()
Я также добавил подсказку типа (используя .with_output_types((Tuple[MessageKey, Message])) в приведенной выше строке, но она по-прежнему выдает то же предупреждение. Но я не думаю, что это необходимо, поскольку PerKey из луча уже определяет типы ввода и вывода.

Код: Выделить всё

@with_input_types(tuple[K, V])
@with_output_types(tuple[K, V])
class PerKey(ptransform.PTransform):
...

Код: Выделить всё

MessageKey
— это просто производное от BaseModel pydantic`.

Код: Выделить всё

class MessageKey(BaseModel):
...
Затем я создал собственный кодировщик:

Код: Выделить всё

class MessageKeyCoder(Coder):
def encode(self, value: MessageKey) -> bytes:
return json.dumps(value.model_dump(), sort_keys=True).encode("utf-8")

def decode(self, encoded: bytes) -> MessageKey:
data = json.loads(encoded.decode("utf-8"))
return MessageKey(**data)

def is_deterministic(self) -> bool:
return True

def estimate_size(self, value: MessageKey) -> int:
return len(self.encode(value))

Я зарегистрировал это, используя:

Код: Выделить всё

beam.coders.registry.register_coder(MessageKey, MessageKeyCoder)
Я пытался поместить это в разные места кода, но ничего не помогло устранить предупреждение.
  • Добавлено в тот же файл, что и MessageKeyCoder, чуть ниже него.
  • Добавлено сразу после всех импортов в файле, где определен конвейер.
  • Добавлено внутри контекстного менеджера с Pipeline(...) как p.


Подробнее здесь: https://stackoverflow.com/questions/797 ... pe-warning

Вернуться в «Python»