У меня есть 50 000 записей, которые я хочу хранить в моем контейнере CosmosDB. Поскольку это занимает некоторое время, я ищу быстрый способ достичь этого.class CosmosDB:
def __init__(self) -> None:
self.client = CosmosClient(url="...", credential=config.azure_cosmosdb_access_key)
database = self.client.get_database_client("...")
self.container = database.get_container_client("...")
...
async def bulk_insert(self, data: List[Dict[str, Any]], batch_size: int = 1000) -> None:
semaphore = asyncio.Semaphore(100) # only 100 at once
async def _limited_upsert(item: Dict[str, Any]): # to avoid cosmos db limits
async with semaphore:
return await self.container.upsert_item(item)
successful_writes = 0
failed_writes = 0
for i in range(0, len(data), batch_size): # to limit system load
batch = data[i:i + batch_size]
operations = [_limited_upsert(item) for item in batch]
results = await asyncio.gather(*operations, return_exceptions=True)
for j, result in enumerate(results):
item_id = data[j].get('id', 'N/A')
if isinstance(result, CosmosHttpResponseError):
failed_writes += 1
logger.error(
f"Error writing item '{item_id}': Status {result.status_code}, Message: {result.message}"
)
if result.status_code == StatusCodes.CONFLICT:
logger.warning(f" Item '{item_id}' might already exist.")
elif isinstance(result, Exception):
failed_writes += 1
logger.error(f"Unexpected error for item '{item_id}': {result}")
else:
successful_writes += 1
logger.info(f"\nBulk operation completed. Successful: {successful_writes}, Failed: {failed_writes}")
Подробнее здесь: https://stackoverflow.com/questions/797 ... -in-python
Azure Cosmosdb Bulk Insert в Python ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Спящий режим: объединить последовательные операторы INSERT в многозначный INSERT
Anonymous » » в форуме JAVA - 0 Ответы
- 59 Просмотры
-
Последнее сообщение Anonymous
-