Ошибка многопоточного рандеву при вставке данных Milvus — как исправить?Python

Программы на Python
Ответить
Anonymous
 Ошибка многопоточного рандеву при вставке данных Milvus — как исправить?

Сообщение Anonymous »

В настоящее время я запускаю скрипт Python для вставки набора данных, содержащего около 10 миллионов векторов, в коллекцию Milvus. Хотя процесс вставки работает хорошо для первых 40 пакетов по 10 000 записей, после этого момента я постоянно сталкиваюсь со следующей ошибкой:

Код: Выделить всё

[describe_collection] retry: X, cost: Y, reason: 
Вот скрипт, который я использую для вставки:

Код: Выделить всё

import numpy as np
from dtx_data_tools.iterate import batched, map_threaded

times = []
files = [f'agg_dataset/{fp}' for fp in os.listdir('agg_dataset') if 'parquet' in fp]

db_ids_set = set()
counter = 0

for batch_file in files:
prep_start = time.time()
df = pd.read_parquet(batch_file).drop_duplicates(subset='scrape_uuid', keep="last")

insert_ids = set(df['scrape_uuid'].tolist())

new_uuids = insert_ids - db_ids_set
df = df[df['scrape_uuid'].isin(new_uuids)]

db_ids_set.update(new_uuids)

prep_end  = time.time() - prep_start
print(f"prep time took {prep_end} seconds")

start = time.time()
transform_time = time.time()
df['content_vector'] = df['encoding'].apply(lambda x: np.frombuffer(x, dtype=np.float32))
df = df[['scrape_uuid', 'content_vector']]
print(f"Transform time: {time.time() - transform_time} seconds")

try:
client.insert(
collection_name="milvus_orb_benchmark",
data=df.to_dict('records')
)

counter += 10000
print(f"Imported {counter} articles..., batch {batch_file} uploaded")
end = time.time() - start
print(f"batch insert_time took {end} seconds")
times.append(end)
Я подозреваю, что эта проблема связана с тайм-аутом соединения с сервером gRPC. Для справки я также приложил свою конфигурацию оператора Milvus:

Код: Выделить всё

apiVersion: v1
kind: ServiceAccount
metadata:
name: milvus
annotations:
eks.amazonaws.com/role-arn: xxxxxx
apiVersion: milvus.io/v1beta1
kind: Milvus
metadata:
name: milvus
labels:
app: milvus
spec:
components:
serviceAccountName: milvus
config:
minio:
bucketName: xxxxxx
# enable AssumeRole
useIAM: true
useSSL: true
dependencies:
storage:
external: true
type: S3
endpoint:xxxxxxxx
secretRef: ""
Мои вопросы:
  • Как устранить ошибку MultiThreadedRendezvous: StatusCode.UNAVAILABLE?< /li>
    Есть ли определенные настройки, которые мне следует изменить в конфигурации Milvus или на сервере gRPC, чтобы предотвратить таймауты соединения во время процессов вставки больших данных?
  • Есть ли лучший способ обрабатывать вставку такого большой набор данных в Milvus и не столкнуться с этой проблемой?


Подробнее здесь: https://stackoverflow.com/questions/792 ... how-to-fix
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»