Я читаю данные из postgrseql, а затем записываю их в модель mariadb с помощью pyspark jdbc, и у меня возникла эта ошибка
Caused by: java.sql.BatchUpdateException: (conn=12) You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '"job_title","experience_level","employment_type") VALUES (?,?,?)' at line 1
Эта ошибка касается только первой таблицы, все таблицы имеют одинаковый тип
Я попытался перезаписать схему еще раз, используя приведение в pyspark, но это не сработало
это мой код для чтения из postgresql
def mariadb_loader():
try:
spark = SparkSession.builder \
.appName('mariadbloader') \
.config('spark.jars',
'/home/basel/main/Grad_proj/jars/mariadb-java-client-3.4.1.jar,/home/basel/main/Grad_proj/jars/postgresql-42.6.0.jar') \
.getOrCreate()
try:
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5423/airflow",
table='public.staging_data',
properties=psql_connection_properties
)
mariadb_dimensions_loader(df)
except Exception as e:
print(f"Error while reading from PostgreSQL or loading dimensions: {e}")
except Exception as e:
print(f"Error in mariadb_loader: {e}")
а это код для записи в mariadb
def mariadb_dimensions_loader(df):
try:
job_dim = df.select(
col("job_title").cast(StringType()),
col("experience_level").cast(StringType()),
col("employment_type").cast(StringType())
)
# Employee Dimension
employee_dim = df.select(col("employee_residence").cast(StringType()))
# Company Dimension
company_dim = df.select(
col("company_location").cast(StringType()),
col("company_size").cast(StringType()),
col("remote_ratio").cast(IntegerType())
)
# Currency Dimension
currency_dim = df.select(col("salary_currency").cast(StringType()))
tables = {
'job_dim': job_dim,
'employee_dim': employee_dim,
'company_dim': company_dim,
'currency_dim': currency_dim
}
for table_name ,data in tables.items():
try:
data.write.jdbc(
url="jdbc:mariadb://127.0.0.1:3306/main",
table=table_name,
mode='append',
properties=maria_connection_properties
)
except Exception as e:
print(f"Error while writing {table_name} to MariaDB: {e}")
except Exception as e:
print(f"Error in mariadb_dimensions_loader: {e}")
Подробнее здесь: https://stackoverflow.com/questions/790 ... ine-issues
Проблемы с конвейером данных ⇐ Python
Программы на Python
1728487888
Anonymous
Я читаю данные из postgrseql, а затем записываю их в модель mariadb с помощью pyspark jdbc, и у меня возникла эта ошибка
Caused by: java.sql.BatchUpdateException: (conn=12) You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '"job_title","experience_level","employment_type") VALUES (?,?,?)' at line 1
Эта ошибка касается только первой таблицы, все таблицы имеют одинаковый тип
Я попытался перезаписать схему еще раз, используя приведение в pyspark, но это не сработало
это мой код для чтения из postgresql
def mariadb_loader():
try:
spark = SparkSession.builder \
.appName('mariadbloader') \
.config('spark.jars',
'/home/basel/main/Grad_proj/jars/mariadb-java-client-3.4.1.jar,/home/basel/main/Grad_proj/jars/postgresql-42.6.0.jar') \
.getOrCreate()
try:
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5423/airflow",
table='public.staging_data',
properties=psql_connection_properties
)
mariadb_dimensions_loader(df)
except Exception as e:
print(f"Error while reading from PostgreSQL or loading dimensions: {e}")
except Exception as e:
print(f"Error in mariadb_loader: {e}")
а это код для записи в mariadb
def mariadb_dimensions_loader(df):
try:
job_dim = df.select(
col("job_title").cast(StringType()),
col("experience_level").cast(StringType()),
col("employment_type").cast(StringType())
)
# Employee Dimension
employee_dim = df.select(col("employee_residence").cast(StringType()))
# Company Dimension
company_dim = df.select(
col("company_location").cast(StringType()),
col("company_size").cast(StringType()),
col("remote_ratio").cast(IntegerType())
)
# Currency Dimension
currency_dim = df.select(col("salary_currency").cast(StringType()))
tables = {
'job_dim': job_dim,
'employee_dim': employee_dim,
'company_dim': company_dim,
'currency_dim': currency_dim
}
for table_name ,data in tables.items():
try:
data.write.jdbc(
url="jdbc:mariadb://127.0.0.1:3306/main",
table=table_name,
mode='append',
properties=maria_connection_properties
)
except Exception as e:
print(f"Error while writing {table_name} to MariaDB: {e}")
except Exception as e:
print(f"Error in mariadb_dimensions_loader: {e}")
Подробнее здесь: [url]https://stackoverflow.com/questions/79071016/data-pipeline-issues[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия