Ленивое сканирование из общей памятиPython

Программы на Python
Ответить
Anonymous
 Ленивое сканирование из общей памяти

Сообщение Anonymous »

Я пытаюсь создать систему, в которой я буду читать/записывать Polars из общей памяти с помощью Arrow IPC. У меня работает нетерпеливый код (

Код: Выделить всё

read_ipc
работает), но функция scan_ipc у меня не работает. Следующий скрипт иллюстрирует проблему:

Код: Выделить всё

# /// script
# requires-python = "~=3.12.0"
# dependencies = [
#     "polars[pyarrow]==1.38.1",
# ]
# ///
"""Minimal working example for scan IPC bug."""

import logging
from multiprocessing.shared_memory import SharedMemory

import polars as pl
import pyarrow as pa

LOG = logging.getLogger(__name__)

pl.show_versions()

if __name__ == "__main__":
# Create a toy dataset
data = pl.DataFrame({"a": [1, 2, 3]})
with pa.MockOutputStream() as sink:
data.write_ipc(sink)

shm = SharedMemory(name="test-ipc-polars", create=True, size=sink.size())
with pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf)) as stream:
data.write_ipc(stream)
del stream
shm.close()

# Read the dataset back
existing_ = SharedMemory(shm.name, create=False)
buf_ = pa.py_buffer(existing_.buf)
# out = pl.read_ipc(buf_, use_pyarrow=True)  # Works
# print(out)

try:
res = pl.scan_ipc(buf_)
except Exception as exc:
LOG.error(exc, stack_info=True, exc_info=True)
finally:
del buf_
existing_.close()
shm.unlink()
Запуск этого сценария с подробным журналированием дает следующий результат:

Код: Выделить всё

_init_credential_provider_builder(): credential_provider_init = None
async thread count: 4
polars-stream: updating graph state
polars-stream: running in-memory-source in subgraph
polars-stream: running io-sink[single-file[ipc]] in subgraph
async upload_chunk_size: 67108864
async upload_concurrency: 8
io-sink[single-file[ipc]]: start_single_file_sink_pipeline: file_writer_starter: ipc, takeable_rows_provider: TakeableRowsProvider { max_size: NonZeroRowCountAndSize { num_rows: 122880, num_bytes: 18446744073709551615 }, byte_size_min_rows: 16384, allow_non_max_size: false }, upload_chunk_size: 67108864
polars-stream: done running graph phase
polars-stream: updating graph state
io-sink[single-file[ipc]]: Join on task_handle (recv PortState::Done)
io-sink[single-file[ipc]]: Statistics: total_size: RowCountAndSize { num_rows: 3, num_bytes: 24 }
_init_credential_provider_builder(): credential_provider_init = None
polars-stream: updating graph state
polars-stream: running io-sink[single-file[ipc]] in subgraph
polars-stream: running in-memory-source in subgraph
io-sink[single-file[ipc]]: start_single_file_sink_pipeline: file_writer_starter: ipc, takeable_rows_provider: TakeableRowsProvider { max_size: NonZeroRowCountAndSize { num_rows: 122880, num_bytes: 18446744073709551615 }, byte_size_min_rows: 16384, allow_non_max_size: false }, upload_chunk_size: 67108864
polars-stream: done running graph phase
polars-stream: updating graph state
io-sink[single-file[ipc]]: Join on task_handle (recv PortState::Done)
io-sink[single-file[ipc]]: Statistics: total_size: RowCountAndSize { num_rows: 3, num_bytes: 24 }
_init_credential_provider_builder(): credential_provider_init = None
argument 'sources': Object does not have a .read() method.
Traceback (most recent call last):
File "/workspaces/github/mwe.py", line 36, in 
res = pl.scan_ipc(buf_)
^^^^^^^^^^^^^^^^^
File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 128, in wrapper
return function(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 128, in wrapper
return function(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/io/ipc/functions.py", line 506, in scan_ipc
pylf = PyLazyFrame.new_from_ipc(
^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: argument 'sources': Object does not have a .read() method.
Stack (most recent call last):
File "/workspaces/github/mwe.py", line 38, in 
LOG.error(exc, stack_info=True, exc_info=True)
Мне бы хотелось иметь возможность лениво сканировать буфер!

Подробнее здесь: https://stackoverflow.com/questions/799 ... red-memory
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»