Приложение должно получить данные из API, упорядочить их в pandas.DataFrame, принудительно ввести тип int в
столбце "num" .astype (int).
После этого я хотел бы начать транзакцию с базой данных, чтобы
- создать временную таблицу
вставить данные в tempTable - объединить tempTable с prodTable
- зафиксировать транзакцию, если ошибок не произошло, или выполнить откат, если база данных вызвала ошибку ошибка.
при попытке запустите эту версию кода:
Схема таблицы SQL:
CREATE TABLE [dbo].[discountDimensions](
[ID] [int] IDENTITY(1,1) NOT NULL,
[num] [int] NOT NULL,
[name] [varchar](60) NULL,
[posPercent] [float] NULL
)
В DataProcessor.py:
def discounts_dim_to_df(self, data:dict):
df = pd.DataFrame(data["discounts"])
df = df.drop(columns=['num', 'name'])
df['mstrNum'] = df["mstrNum"].astype(int)
return df.rename(columns={"mstrNum":"num","mstrName":"name"})
В Conn.py:
class Conn:
def __init__(self, server:str, database:str, username:str, passwd:str):
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f'mssql+pyodbc://{username}:{passwd}@{server}:1433/{database}?driver={driver}'
try:
self.engine = create_engine(connection_string, pool_pre_ping=True)
Session = sessionmaker(bind=self.engine)
self.session = Session()
except Exception as err:
print(f'Failed to connect to {server} -> {database}')
mm = Mailman()
mm.connect_to_db_failed(server=server,db=database,error_message=err)
def update_discounts_db(self, data:pd.DataFrame) -> bool:
create_temp_sql = '''
CREATE TABLE #tempDiscountDimensions (
num INT PRIMARY KEY,
name VARCHAR(60) NOT NULL,
posPercent FLOAT NULL
);
'''
merge_sql = '''
MERGE INTO discountDimensions AS target
USING #tempDiscountDimensions AS source
ON target.num = source.num
WHEN MATCHED THEN
UPDATE SET target.name = source.name,
target.posPercent = source.posPercent
WHEN NOT MATCHED THEN
INSERT (num, name, posPercent) VALUES (source.num, source.name, source.posPercent);
'''
try:
self.session.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
self.session.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=self.session,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
self.session.execute(text(merge_sql))
self.session.execute(text('DROP TABLE #tempDiscountDimensions'))
self.session.commit()
except Exception as err:
self.session.rollback()
print(f'Failed to update/insert data from discountDimensions table:\n{err}')
mm = Mailman()
mm.process_failed(option='db_connection', error=err)
return None
Выдает ошибку:
Traceback (most recent call last):
File "path\to\folder\main.py", line 81, in
conn.update_discounts_db(data=data_disc)
File "path\to\folder\Conn.py", line 82, in update_discounts_db
data.to_sql('#tempDiscountDimensions',
File "path\to\folder\_env\Lib\site-packages\pandas\util\_decorators.py", line 333, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\core\generic.py", line 3087, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 842, in to_sql
return pandas_sql.to_sql(
^^^^^^^^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 2839, in to_sql
raise ValueError(f"{col} ({my_type}) not a string")
ValueError: num (INTEGER) not a string
Но когда я меняю «self.session» на «с self.engine.begin() в качестве соединения», это работает. Однако я потерял возможность откатиться в случае ошибки.
try:
with self.engine.begin() as connection:
connection.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
connection.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=connection,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
connection.execute(text(merge_sql))
connection.execute(text('DROP TABLE #tempDiscountDimensions'))
Подробнее здесь: https://stackoverflow.com/questions/790 ... -session-b