# /// script
# requires-python = "~=3.12.0"
# dependencies = [
# "polars[pyarrow]==1.38.1",
# ]
# ///
"""Minimal working example for scan IPC bug."""
import logging
from multiprocessing.shared_memory import SharedMemory
import polars as pl
import pyarrow as pa
LOG = logging.getLogger(__name__)
pl.show_versions()
if __name__ == "__main__":
# Create a toy dataset
data = pl.DataFrame({"a": [1, 2, 3]})
with pa.MockOutputStream() as sink:
data.write_ipc(sink)
shm = SharedMemory(name="test-ipc-polars", create=True, size=sink.size())
with pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf)) as stream:
data.write_ipc(stream)
del stream
shm.close()
# Read the dataset back
existing_ = SharedMemory(shm.name, create=False)
buf_ = pa.py_buffer(existing_.buf)
# out = pl.read_ipc(buf_, use_pyarrow=True) # Works
# print(out)
try:
res = pl.scan_ipc(buf_)
except Exception as exc:
LOG.error(exc, stack_info=True, exc_info=True)
finally:
del buf_
existing_.close()
shm.unlink()
Запуск этого сценария с подробным журналированием дает следующий результат:
Я пытаюсь создать систему, в которой я буду читать/записывать Polars из общей памяти с помощью Arrow IPC. У меня работает нетерпеливый код ([code]read_ipc[/code] работает), но функция scan_ipc у меня не работает. Следующий скрипт иллюстрирует проблему: [code]# /// script # requires-python = "~=3.12.0" # dependencies = [ # "polars[pyarrow]==1.38.1", # ] # /// """Minimal working example for scan IPC bug."""
import logging from multiprocessing.shared_memory import SharedMemory
import polars as pl import pyarrow as pa
LOG = logging.getLogger(__name__)
pl.show_versions()
if __name__ == "__main__": # Create a toy dataset data = pl.DataFrame({"a": [1, 2, 3]}) with pa.MockOutputStream() as sink: data.write_ipc(sink)
shm = SharedMemory(name="test-ipc-polars", create=True, size=sink.size()) with pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf)) as stream: data.write_ipc(stream) del stream shm.close()
# Read the dataset back existing_ = SharedMemory(shm.name, create=False) buf_ = pa.py_buffer(existing_.buf) # out = pl.read_ipc(buf_, use_pyarrow=True) # Works # print(out)
try: res = pl.scan_ipc(buf_) except Exception as exc: LOG.error(exc, stack_info=True, exc_info=True) finally: del buf_ existing_.close() shm.unlink() [/code] Запуск этого сценария с подробным журналированием дает следующий результат: [code]_init_credential_provider_builder(): credential_provider_init = None async thread count: 4 polars-stream: updating graph state polars-stream: running in-memory-source in subgraph polars-stream: running io-sink[single-file[ipc]] in subgraph async upload_chunk_size: 67108864 async upload_concurrency: 8 io-sink[single-file[ipc]]: start_single_file_sink_pipeline: file_writer_starter: ipc, takeable_rows_provider: TakeableRowsProvider { max_size: NonZeroRowCountAndSize { num_rows: 122880, num_bytes: 18446744073709551615 }, byte_size_min_rows: 16384, allow_non_max_size: false }, upload_chunk_size: 67108864 polars-stream: done running graph phase polars-stream: updating graph state io-sink[single-file[ipc]]: Join on task_handle (recv PortState::Done) io-sink[single-file[ipc]]: Statistics: total_size: RowCountAndSize { num_rows: 3, num_bytes: 24 } _init_credential_provider_builder(): credential_provider_init = None polars-stream: updating graph state polars-stream: running io-sink[single-file[ipc]] in subgraph polars-stream: running in-memory-source in subgraph io-sink[single-file[ipc]]: start_single_file_sink_pipeline: file_writer_starter: ipc, takeable_rows_provider: TakeableRowsProvider { max_size: NonZeroRowCountAndSize { num_rows: 122880, num_bytes: 18446744073709551615 }, byte_size_min_rows: 16384, allow_non_max_size: false }, upload_chunk_size: 67108864 polars-stream: done running graph phase polars-stream: updating graph state io-sink[single-file[ipc]]: Join on task_handle (recv PortState::Done) io-sink[single-file[ipc]]: Statistics: total_size: RowCountAndSize { num_rows: 3, num_bytes: 24 } _init_credential_provider_builder(): credential_provider_init = None argument 'sources': Object does not have a .read() method. Traceback (most recent call last): File "/workspaces/github/mwe.py", line 36, in res = pl.scan_ipc(buf_) ^^^^^^^^^^^^^^^^^ File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 128, in wrapper return function(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 128, in wrapper return function(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/.cache/uv/environments-v2/mwe-63942327ce1a05a8/lib/python3.12/site-packages/polars/io/ipc/functions.py", line 506, in scan_ipc pylf = PyLazyFrame.new_from_ipc( ^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: argument 'sources': Object does not have a .read() method. Stack (most recent call last): File "/workspaces/github/mwe.py", line 38, in LOG.error(exc, stack_info=True, exc_info=True) [/code] Мне бы хотелось иметь возможность лениво сканировать буфер!