Apache Beam - преобразовать пустое время данных обратно в PcollectionPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Apache Beam - преобразовать пустое время данных обратно в Pcollection

Сообщение Anonymous »

Я экспериментирую с Apache Beam и борюсь с Correclty Rading DataFrame до преобразования Pcollection. Я протестировал реализацию левого против Join с следующей логикой: < /p>
def left_anti_join(
left: DeferredDataFrame,
right: DeferredDataFrame,
left_index: str,
right_index: str, ) -> DeferredDataFrame:

inner_joined_df = left.merge(
right.set_index(right_index),
right_index=True,
left_on=left_index,
how="left",
indicator=True,
)
return inner_joined_df[inner_joined_df._merge == "left_only"].drop("_merge", axis=1)
< /code>
После выполнения этой функции я превращаю ее вывод в p_collection с: < /p>
data = left_anti_join(df1,df2,"id", "id")
to_pcollection(data)
< /code>
Все работает нормально, пока отсроченная DataFrame не возвращается моей функцией "LEATE_ANTI JOIN", не пустая./mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py:226: RuntimeWarning: invalid value encountered in long_scalars
mean_element_size = self.producer_batch_converter.estimate_byte_size(
Traceback (most recent call last):
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
self.main_receivers.receive_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
self.update_counters_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
self.opcounter.update_from_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
self.mean_byte_counter.update_n(mean_element_size, batch_length)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
self._fast_add_input_n(value, n)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
element = int(element)
ValueError: cannot convert float NaN to integer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/mydir/beam_jobs/test_job.py", line 163, in
with beam.Pipeline(options=beam_options) as p:
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 597, in __exit__
self.result = self.run()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/pipeline.py", line 574, in run
return self.runner.run_pipeline(self, self._options)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 199, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 442, in run_stages
bundle_results = self._execute_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _execute_bundle
self._run_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 999, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1309, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
return getattr(self, request_type)(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle
op.finish()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 943, in finish
self.dofn_runner.finish()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1479, in finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1460, in _invoke_bundle_method
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
raise exn
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1458, in _invoke_bundle_method
bundle_method()
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 566, in invoke_finish_bundle
self.output_handler.finish_bundle_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1745, in finish_bundle_outputs
self.main_receivers.receive(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1491, in _reraise_augmented
raise exn
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 837, in invoke_process
self._invoke_process_per_window(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 981, in _invoke_process_per_window
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1581, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1696, in _write_value_to_tag
self.tagged_receivers[tag].receive(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1419, in process
self._reraise_augmented(exn)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1507, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 623, in invoke_process
self.output_handler.handle_process_outputs(
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1600, in handle_process_outputs
self._write_batch_to_tag(tag, windowed_batch, watermark_estimator)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/common.py", line 1704, in _write_batch_to_tag
self.main_receivers.receive_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 365, in receive_batch
self.update_counters_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/operations.py", line 206, in update_counters_batch
self.opcounter.update_from_batch(windowed_batch)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/runners/worker/opcounters.py", line 228, in update_from_batch
self.mean_byte_counter.update_n(mean_element_size, batch_length)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/utils/counters.py", line 222, in update_n
self._fast_add_input_n(value, n)
File "/mydir/.venv/lib/python3.10/site-packages/apache_beam/transforms/cy_combiners.py", line 213, in add_input_n
element = int(element)
ValueError: cannot convert float NaN to integer [while running 'Unbatch 'drop_DataFrame_6039969152'']
< /code>
Какой правильный способ обработки таких случаев пустых данных? Я пытался подсчитать количество элементов, прежде чем преобразовать его в Pcollection, но он чувствует себя неэффективным

Подробнее здесь: https://stackoverflow.com/questions/749 ... collection
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Формат объектов Pcollection для Apache Beam для записи на BigQuery с использованием CDC в Python
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Преобразовать pcollection в пользовательский класс
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Как создать пустую PCollection>
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Чтение файлов AVRO в GCS как PCOLLECTION
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Apache Beam с Python. Ошибка при попытке аутентификации Apache Kafka SASL_SSL OAUTHBEARER
    Anonymous » » в форуме Python
    0 Ответы
    41 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»