Apache Beam: количество компонентов не соответствует количеству кодеров при использовании таймеровPython

Программы на Python
Ответить
Anonymous
 Apache Beam: количество компонентов не соответствует количеству кодеров при использовании таймеров

Сообщение Anonymous »

Мой вопрос очень похож на этот
Я пытаюсь добавить несколько таймеров для дальнейшей обработки некоторых данных, но получаю следующую ошибку

Код: Выделить всё

Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.process
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 158, in process
self.windowed_coder_impl.encode_to_stream(
File "apache_beam/coders/coder_impl.py", line 1448, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 1467, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 1023, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream
ValueError: Number of components does not match number of coders.
моя глубина

Код: Выделить всё

class WaitUntilDevicesExist(beam.DoFn):
BUFFER_STATE = beam.transforms.userstate.BagStateSpec('buffer', beam.coders.StrUtf8Coder())
TIMER = beam.transforms.userstate.TimerSpec('timer', beam.TimeDomain.REAL_TIME)

BUFFER_TIMER = 15  # seconds

...

def process(self, key_value, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
shard_id, batch = key_value

for message in batch:
logging.info(f"Checking = {message}")

...

if (...):
timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER))
buffer.add(DeviceCheckHelper(message).to_string())
else:
yield message

@beam.transforms.userstate.on_timer(TIMER)
def expiry_callback(self, timer=beam.DoFn.TimerParam(TIMER), buffer=beam.DoFn.StateParam(BUFFER_STATE)):
events = buffer.read()
logging.info("Timer")
new_buffer = []

for row in events:
message = DeviceCheckHelper.from_string(row)
logging.info(message)

....

if (...):
if retry == 3:
logging.info(f"Waited 3 times, yielding ")
yield message.message
else:
message.increase_retry()
new_buffer.append(message.to_string())
logging.info(f"retry = {message}")

buffer.clear()
timer.clear()

logging.info(f"New buffer = {new_buffer}")
if new_buffer:
for row in new_buffer:
logging.info(f"Adding {row}")
buffer.add(row)

timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=self.BUFFER_TIMER))
и конвейер выглядит так

Код: Выделить всё

# 1 filter messages
filtered_messages = (
transformed_messages[TransformData.TAG_OK]
| f"Clean Devices {tenant}" >> beam.ParDo(FilterMessages()).with_outputs(FilterMessages.DEVICE_TAG, FilterMessages.OBSERVATION_TAG)
)

# 2 Write observations

observation_results = (
filtered_messages[FilterMessages.OBSERVATION_TAG]
| f"{tenant} Check Devices" >> beam.ParDo(WaitUntilDevicesExist(...))
| f"{tenant} Window Observations messages" >> GroupMessagesByShardedKey(max_messages=200, max_waiting_time=10, shard_key="obs", num_shards=10)
| f"{tenant} Write Observations" >> beam.ParDo(Write(...)).with_outputs(FAILED_TAG)

)

Если я перемещаю WaitUntilDevicesExist после GroupMessagesByShardedKey, все работает нормально. Что мне не хватает?

Подробнее здесь: https://stackoverflow.com/questions/791 ... -using-tim
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»