Приложение должно получить данные из API, упорядочить их в pandas.DataFrame, принудительно ввести тип int в
столбце "num" .astype (int).
После этого я хотел бы начать транзакцию с базой данных, чтобы
- создать временную таблицу
вставить данные в tempTable - объединить tempTable с prodTable
- зафиксировать транзакцию, если ошибок не произошло, или выполнить откат, если база данных вызвала ошибку ошибка.
при попытке запустите эту версию кода:
class Conn:
def __init__(self, server:str, database:str, username:str, passwd:str):
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f'mssql+pyodbc://{username}:{passwd}@{server}:1433/{database}?driver={driver}'
try:
self.engine = create_engine(connection_string, pool_pre_ping=True)
Session = sessionmaker(bind=self.engine)
self.session = Session()
except Exception as err:
print(f'Failed to connect to {server} -> {database}')
mm = Mailman()
mm.connect_to_db_failed(server=server,db=database,error_message=err)
def update_discounts_db(self, data:pd.DataFrame) -> bool:
create_temp_sql = '''
CREATE TABLE #tempDiscountDimensions (
num INT PRIMARY KEY,
name VARCHAR(60) NOT NULL,
posPercent FLOAT NULL
);
'''
merge_sql = '''
MERGE INTO discountDimensions AS target
USING #tempDiscountDimensions AS source
ON target.num = source.num
WHEN MATCHED THEN
UPDATE SET target.name = source.name,
target.posPercent = source.posPercent
WHEN NOT MATCHED THEN
INSERT (num, name, posPercent) VALUES (source.num, source.name, source.posPercent);
'''
try:
self.session.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
self.session.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=self.session,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
self.session.execute(text(merge_sql))
self.session.execute(text('DROP TABLE #tempDiscountDimensions'))
self.session.commit()
except Exception as err:
self.session.rollback()
print(f'Failed to update/insert data from discountDimensions table:\n{err}')
mm = Mailman()
mm.process_failed(option='db_connection', error=err)
return None
Но когда я меняю «self.session» на «с self.engine.begin() в качестве соединения», это работает. Однако я потерял возможность откатиться в случае ошибки.
try:
with self.engine.begin() as connection:
connection.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
connection.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=connection,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
connection.execute(text(merge_sql))
connection.execute(text('DROP TABLE #tempDiscountDimensions'))
Подробнее здесь: https://stackoverflow.com/questions/790 ... -session-b