Код: Выделить всё
#!/usr/bin/env python3
# encoding: utf-8
import polars as pl
from sqlalchemy import Table, MetaData, func
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import Query, Session
...
frame=pl.DataFrame(data={'data':range(10)})
OUT_TABLE_NAME:str='transaction_test'
with EngineContext(uri=CONNECTION) as engine:
out_table: Table = Table(OUT_TABLE_NAME, MetaData(bind=engine, schema=SCHEMA_NAME), autoload=True)
with Session(bind=engine) as session:
session.begin()
try:
for item in frame.to_dicts():
print(item['data'])
if item['data']==5:
raise ValueError
statement=out_table.insert(values=[item])
print(statement)
session.execute(statement=statement)
sleep(1)
except ValueError:
session.rollback()
logging.info(msg='Session rolled back')
else:
session.commit()
logging.info(msg='Session committed')
Вставьте последовательность записей (каждая из которых содержит целое число) в большой запрос одну за другой.
< h5>До
Прежде чем запустить приведенный выше код, предположим, что таблица (в Google Bigquery) пуста.
После
Ожидаемый результат
Таблица остается пустой, поскольку в случае ошибки SQL Alchemy должен откатить сеанс.
Наблюдаемый результат
Таблица содержит значения от 0 до 4 включительно.
Библиотеки пипов
Код: Выделить всё
sqlalchemy==1.4.51 # ORM
sqlalchemy-bigquery==1.9.0 # Dialect for bigquery, need latest Ubuntu version
polars==1.2.1 # Tabular data manipulation
google-cloud-bigquery-storage==2.25.0 # Bigquery storage write client used for inserts
- Python 3.8
- Ubuntu 20.04
Почему Big Query не откатывает сессию и не оставляет таблицу пустой (восстанавливая предыдущее состояние до начала транзакции) ) вместо частичной фиксации?
Подробнее здесь: https://stackoverflow.com/questions/789 ... ansactions