Чтобы справиться с этим, я реализовал специальный загрузчик данных, который считывает файлы .npy в пакетном режиме.
Вот мой Dataloader.py:
Код: Выделить всё
import numpy as np
import os
import gc
class Np_DataLoader_Cache:
def __init__(self, data_dir, file_list, batch_size, train_ratio=0.8):
self.data_dir = data_dir
self.batch_size = batch_size
self.file_list = file_list
self.train_ratio = train_ratio
self.current_file_id = -1
self.current_data = None
self.shapes = []
for file in file_list:
with open(os.path.join(data_dir, file), 'rb') as f:
version = np.lib.format.read_magic(f)
shape, fortran_order, dtype = np.lib.format.read_array_header_1_0(f)
self.shapes.append(shape[0])
self.prefix_sums = [0]
for s in self.shapes:
self.prefix_sums.append(self.prefix_sums[-1] + s)
self.total_samples = self.prefix_sums[-1]
self.train_end_idx = int(self.total_samples * self.train_ratio)
self.valid_start_idx = self.train_end_idx
def _load_file_if_needed(self, file_id):
if self.current_file_id != file_id:
if self.current_data is not None:
del self.current_data
gc.collect()
filename = self.file_list[file_id]
self.current_data = np.load(os.path.join(self.data_dir, filename), mmap_mode='r')
self.current_file_id = file_id
def _get_batch(self, start_row_global, end_row_global):
files_needed = []
current_row = start_row_global
file_id = 0
while current_row < end_row_global:
while file_id < len(self.prefix_sums) - 1 and self.prefix_sums[file_id + 1] = end_row:
return None
return self._get_batch(start_row, end_row)
def get_valid_batch(self, batch_index):
start_row = self.valid_start_idx + batch_index * self.batch_size
end_row = min(self.valid_start_idx + (batch_index + 1) * self.batch_size, self.total_samples)
if start_row >= end_row:
return None
return self._get_batch(start_row, end_row)
def close(self):
if self.current_data is not None:
del self.current_data
self.current_data = None
self.current_file_id = -1
gc.collect()
Для исследования я провел несколько экспериментов по измерению скорости пакетной загрузки:
Код: Выделить всё
import os
data_dir = './LOB_OFI_sortcode_NoResample/'
filelist = sorted([f for f in os.listdir(data_dir) if 'npy' in f])[:20]
from DataLoader_NP_Cache import Np_DataLoader_Cache
demo2 = Np_DataLoader_Cache(data_dir, filelist, batch_size = 4096, train_ratio = 0.8)
batch_nums = demo2.train_end_idx // demo2.batch_size + 1
import time
begin_t = time.time()
very_first = time.time()
for batch_index in range(batch_nums):
mini_batch = demo2.get_train_batch(batch_index)
if batch_index % 5000 == 0:
end_t = time.time()
print( f"Batch of {batch_index} Done. Process Ratio is {batch_index / batch_nums}" )
elapsed_time = end_t - begin_t
print(f"This 5000Batchs using time: {elapsed_time:.2f} s")
begin_t = time.time()
end_t = time.time()
elapsed_time = end_t - very_first
print(f"Total: {elapsed_time:.2f} s")

Я заметил, что:
Некоторые пакеты загружаются с разумной скоростью (~ 15 секунд на 5000 пакетов),
Но иногда пакет внезапно занимает более 100 секунд, и скорость сильно колеблется.
Эти замедления кажутся случайными — иногда они восстанавливаются, иногда нет. И иногда оно появляется довольно рано, а иногда и позже, как на картинке, которую я разместил. Но на самом деле он часто появляется для третьих или четвертых 5000 пакетов.
Мне интересно, что-то не так с тем, как я реализовал загрузчик данных или как numpy загружает данные (я использую np.load(..., mmap_mode='r') для внутренних целей).
Любые предложения или советы будут с благодарностью приняты. Спасибо!
Подробнее здесь: https://stackoverflow.com/questions/796 ... hen-iterat
Мобильная версия