import os
import json
import platform
import pymysql as pm
from datetime import date
import logging
from logging.handlers import TimedRotatingFileHandler
import logger
import requests
import boto3
import json, logging
from botocore.exceptions import ClientError
class LogOutput:
env=None
config=None
def __init__(self, env_filename):
self.env=env_filename
self.config=self.get_config()
def get_config(self):
with open(self.env) as file_in:
return json.load(file_in)
def is_Windows():
if "win" in (platform.system().lower()):
return True
else:
return False
def setup_logger():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Rotating log setup
log_filename = f"E:\\file\\Logs\\log-{date.today()}.log"
file_handler = TimedRotatingFileHandler(log_filename, when="midnight", backupCount=7)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
setup_logger()
def DB_connection(self):
connection = None
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
config = self.get_config()
host=config["log-output"]["database-secrets"]["host"]
port=config["log-output"]["database-secrets"]["port"]
database=config["log-output"]["database-secrets"]["db"]
username=config["log-output"]["database-secrets"]["username"]
password=config["log-output"]["database-secrets"]["password"]
# Load the configuration from DB_conn_info_local.json
config_file_path = os.path.join(os.path.dirname(__file__), 'DB_conn_info_local.json')
with open(config_file_path) as config_file:
config = json.load(config_file)
connection = pm.connect(user=username,password=password,host=host,port=port,database=database)
logger.info("Successfully connected to database")
except Exception as e:
logging.info("Unable to connect to database: %s", str(e))
return connection
def logging_sp(self):
Query_1 = """
select query 1
Commit;
"""
Query_2 = """
select query 2
Commit;
"""
Query_3 = """
select query 3
commit;
"""
Query_4 = """
/*List of transactions to update. */
select query 3
commit;
"""
Query_5 = """
select query 5
"""
cnxn = self.DB_connection()
#query_list = ['Query_1', 'Query_2', 'Query_3', 'Query_4', 'Query_5']
if cnxn is None:
logging.error("Database connection is None. Cannot proceed.")
return # Exit here if there's no valid connection
try:
with cnxn.cursor() as cur:
cur.execute(Query_1)
cur.execute(Query_2)
cur.execute(Query_3)
cur.execute(Query_4)
cur.execute(Query_5)
except pm.err.Error as e:
logging.error("Error: %s", e)
apikey=self.get_secret()
self.send_opsgenie_alert(apikey)
else:
print("Query_1 ran successfully, {} records updated.".format(cur.rowcount), '\n')
print("Query_2 ran successfully, {} records updated.".format(cur.rowcount), '\n')
print("Query_3 ran successfully, {} records updated.".format(cur.rowcount), '\n')
print("Query_4 ran successfully, {} records updated.".format(cur.rowcount), '\n')
print("Query_5 ran successfully, {} records updated.".format(cur.rowcount), '\n')
cnxn.commit()
cnxn.close() # Close the connection
def main():
cwd=os.getcwd()
if "win" in (platform.system().lower()):
vfc=(cwd+"\\DB_conn_info_local"+".json")
else:
vfc=(cwd+"/DB_conn_info_local"+".json")
ve=LogOutput(vfc)
ve.logging_sp()
if __name__ == "__main__":
main()
Сейчас он настроен на выполнение всех запросов сразу друг за другом, но если произойдет 1 ошибка, остальные не будут выполняться. Я хочу запускать их один за другим. Я создал словарь, который закомментировал, потому что думаю, что это то, что мне нужно использовать, а также, возможно, использовать цикл for? Таким образом, он проходит каждый запрос, затем выполняет его, а затем переходит к следующему, даже если первый не удался...
У меня есть код: [code]import os import json import platform import pymysql as pm from datetime import date import logging from logging.handlers import TimedRotatingFileHandler import logger import requests import boto3 import json, logging from botocore.exceptions import ClientError
# Load the configuration from DB_conn_info_local.json config_file_path = os.path.join(os.path.dirname(__file__), 'DB_conn_info_local.json') with open(config_file_path) as config_file: config = json.load(config_file)
Query_4 = """ /*List of transactions to update. */
select query 3 commit; """
Query_5 = """
select query 5 """ cnxn = self.DB_connection() #query_list = ['Query_1', 'Query_2', 'Query_3', 'Query_4', 'Query_5'] if cnxn is None: logging.error("Database connection is None. Cannot proceed.") return # Exit here if there's no valid connection try: with cnxn.cursor() as cur: cur.execute(Query_1) cur.execute(Query_2) cur.execute(Query_3) cur.execute(Query_4) cur.execute(Query_5) except pm.err.Error as e: logging.error("Error: %s", e) apikey=self.get_secret() self.send_opsgenie_alert(apikey) else: print("Query_1 ran successfully, {} records updated.".format(cur.rowcount), '\n') print("Query_2 ran successfully, {} records updated.".format(cur.rowcount), '\n') print("Query_3 ran successfully, {} records updated.".format(cur.rowcount), '\n') print("Query_4 ran successfully, {} records updated.".format(cur.rowcount), '\n') print("Query_5 ran successfully, {} records updated.".format(cur.rowcount), '\n') cnxn.commit() cnxn.close() # Close the connection
def main(): cwd=os.getcwd() if "win" in (platform.system().lower()): vfc=(cwd+"\\DB_conn_info_local"+".json") else: vfc=(cwd+"/DB_conn_info_local"+".json") ve=LogOutput(vfc) ve.logging_sp()
if __name__ == "__main__": main() [/code] Сейчас он настроен на выполнение всех запросов сразу друг за другом, но если произойдет 1 ошибка, остальные не будут выполняться. Я хочу запускать их один за другим. Я создал словарь, который закомментировал, потому что думаю, что это то, что мне нужно использовать, а также, возможно, использовать цикл for? Таким образом, он проходит каждый запрос, затем выполняет его, а затем переходит к следующему, даже если первый не удался...
У меня есть код:
import os
import json
import platform
import pymysql as pm
from datetime import date
import logging
from logging.handlers import TimedRotatingFileHandler
import logger
import requests
import boto3
import json, logging
from...
Короче говоря, у меня есть проблема с асинхронностью с системой авторизации.
Я использую Rxjs для выполнения этих операций с наблюдаемыми. Любая из этих операций должна ждать, что в настоящее время нет одного из уже запущенных, и если это дело,...
Постановка проблемы:
Перед обновлением до Spring Boot 3 и Spring Batch 5 я мог выполнять несколько объектов Job, определенных в моем приложении, в одном и том же файл. Задания будут выполняться последовательно в зависимости от их порядка в...
Постановка проблемы:
Перед обновлением до Spring Boot 3 и Spring Batch 5 я мог выполнять несколько объектов Job, определенных в моем приложении, в одном и том же файл. Задания будут выполняться последовательно в зависимости от их порядка в...
Мы проводим файлы конфигурации для разных сред в нашем репо. В рамках процесса CI я хотел бы сделать, чтобы эти файлы конфигурации всегда действительны. Для этого я создал этот тест, который копирует конфигурации, пытается запустить сервер и сразу...