Я пытаюсь моделировать создание нескольких файлов H5 параллельно, пытаясь прочитать результаты в прямом эфире с помощью динамически обновленного VDS H5 в режиме SWMR. Все файлы H5 обрабатывают различные куски одного и того же набора данных, и данные записываются параллельно. Событие, которое я хочу наблюдать, является увеличением размера файла, чтобы узнать, чтобы обновить VDS. Я пытался реализовать что -то похожее на то, что здесь показано, но не кажется, что размер моего виртуального набора данных увеличивается, и сюжет не обновляется. Я вижу, как сторожевой пейзаж запускает update_source, когда отдельные файлы детали меняют размер. Мое следующее предположение о том, что попробовать состоит в том, что все файлы должны оставаться открытыми для всех записей данных и что VDS Manager должен создавать файлы деталей и держать их открытыми, чтобы иметь обновление виртуального источника. На данный момент я просто хочу правильно увидеть обновление VDS из изменяющихся файлов деталей. < /P>
Вот одна версия кода, которую я пробовал: < /p>
import numpy as np
import h5py
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
import math
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class VDSManager:
def __init__(self, num_files, vds_filename="vds.h5", directory="."):
self.num_files = num_files
self.vds_filename = vds_filename
self.directory = directory
self.UNLIM = h5py.h5s.UNLIMITED
self.vds_file = h5py.File(vds_filename, "w", libver="latest")
self.vds_file.swmr_mode = True
self.create_layout()
self.create_vds()
self.observer = Observer()
self.handler = self.VDSHandler(self)
self.observer.schedule(self.handler, self.directory, recursive=False)
self.observer.start()
print("VDS Manager started and VDS created.")
def create_layout(self):
fname = "output_1.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
self.virtual_layout = h5py.VirtualLayout((self.num_files, size),
maxshape=(self.num_files, None),
dtype='float64')
def create_vds(self):
for i in range(self.num_files):
fname = f"output_{i}.h5"
if os.path.exists(fname):
with h5py.File(fname, "r", swmr=True) as f:
size = f["data"].shape[0]
else:
size = 0
print(size)
vsource = h5py.VirtualSource(fname, "data", shape=(size,), maxshape=(None,))
self.virtual_layout[i, :self.UNLIM] = vsource[:self.UNLIM]
self.vds_file.create_virtual_dataset("vdata", self.virtual_layout, fillvalue=np.nan)
def update_source(self, file_index):
vs = self.vds_file['vdata'].virtual_sources()
vs[file_index].refresh()
vs.flush()
print(self.vds_file['vdata'][:].shape)
def create_source(self, file_index):
print("File created but I don't think I actually need to do anything!")
class VDSHandler(FileSystemEventHandler):
def __init__(self, manager):
super().__init__()
self.manager = manager
def on_modified(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected modification in {filename}")
self.manager.update_source(idx)
except Exception as e:
print("Error processing modified event:", e)
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if filename.startswith("output_") and filename.endswith(".h5"):
try:
idx = int(filename.split("_")[1].split(".")[0])
print(f"Detected creation of {filename}")
self.manager.create_source(idx)
except Exception as e:
print("Error processing created event:", e)
def close(self):
self.observer.stop()
self.observer.join()
self.vds_file.close()
print("VDS Manager closed.")
def worker(file_index, task_queue):
"""
Each worker writes its processed data to its own file.
"""
filename = f"output_{file_index}.h5"
print(f"Worker {file_index}: Opening {filename} for writing.")
chunk = 0
with h5py.File(filename, "w", libver="latest") as f:
# create a dataset with unlimited growth axis
dset = f.create_dataset("data", (0,), maxshape=(None,), dtype="float64")
f.swmr_mode = True
while True:
print(f"Working on chunk {chunk} for file {file_index}")
task = task_queue.get()
if task is None:
print(f"Worker {file_index}: Terminating.")
break
processed_chunk = np.sin(task)
n = processed_chunk.shape[0]
old_size = dset.shape[0]
new_size = old_size + n
dset.resize((new_size,))
dset[old_size:new_size] = processed_chunk
print(f"dataset size for dataset {file_index}: {dset.shape}")
try:
f.flush() # Flush updates so SWMR readers see them.
except Exception as e:
print(f"Worker {file_index}: flush failed: {e}")
file_size = os.path.getsize(filename)
print(f"Worker {file_index}: File size is {file_size} bytes.")
print(f"Worker {file_index}: Wrote chunk of {n} elements.")
time.sleep(0.1)
chunk += 1
def live_plot_vds(vds, poll_interval=0.1):
plt.ion()
fig, ax = plt.subplots()
line, = ax.plot([], [], "b-", lw=2)
ax.set_xlabel("Index")
ax.set_ylabel("sin(value)")
ax.set_title("Live VDS Data")
while True:
time.sleep(poll_interval)
try:
ds = vds.vds_file['vdata']
ds.id.refresh()
data = ds[:]
print(data.shape)
except Exception as e:
print("Error reading VDS:", e)
data = np.array([])
if data.size > 0:
flat_data = data.flatten()
x = np.arange(flat_data.size)
line.set_data(x, flat_data)
ax.set_xlim(0, flat_data.size)
ax.set_ylim(-1.1, 1.1)
fig.canvas.draw()
fig.canvas.flush_events()
if not plt.fignum_exists(fig.number):
print("Plot window closed. Exiting live plot.")
break
plt.ioff()
plt.show()
def main():
N = 100_000 # Total number of data points.
chunk_size = 1000 # Size of each chunk.
NUM_FILES = 4 # Number of worker processes / output files.
data = np.linspace(0, 10 * np.pi, N)
# Delete existing VDS and output files before starting.
files_to_delete = ["vds.h5"] + [f"output_{i}.h5" for i in range(NUM_FILES)]
for fname in files_to_delete:
if os.path.exists(fname):
os.remove(fname)
print(f"Deleted existing file: {fname}")
# Launch worker processes.
queues = [mp.Queue() for _ in range(NUM_FILES)]
processes = []
for i in range(NUM_FILES):
p = mp.Process(target=worker, args=(i, queues))
p.start()
processes.append(p)
# Dispatch data chunks in round-robin order.
num_chunks = math.ceil(N / chunk_size)
for i in range(num_chunks):
chunk = data[i * chunk_size: (i + 1) * chunk_size]
file_idx = i % NUM_FILES
queues[file_idx].put(chunk)
print(f"Main: Sent chunk {i} (size {chunk.shape[0]}) to worker {file_idx}.")
# Create the VDSManager (which opens the VDS file and starts watchdog).
vds_manager = VDSManager(NUM_FILES, vds_filename="vds.h5", directory=".")
# Start live plotting.
live_plot_vds(vds=vds_manager, poll_interval=0.1)
# Signal termination to each worker.
for q in queues:
q.put(None)
# Wait for workers to finish.
for p in processes:
p.join()
vds_manager.close()
print("All workers finished.")
if __name__ == "__main__":
main()
Подробнее здесь: https://stackoverflow.com/questions/795 ... ple-writer
Динамически обновлять VDS H5Py из данных, записанных на параллельно - несколько писателей -считывателей. ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Не удается установить H5Py (ошибка неудачного строительного колеса для H5Py)
Anonymous » » в форуме Python - 0 Ответы
- 38 Просмотры
-
Последнее сообщение Anonymous
-