Проблемы с конвейером данныхPython

Программы на Python
Ответить
Anonymous
 Проблемы с конвейером данных

Сообщение Anonymous »

Я читаю данные из postgrseql, а затем записываю их в модель mariadb с помощью pyspark jdbc, и у меня возникла эта ошибка
Caused by: java.sql.BatchUpdateException: (conn=12) You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '"job_title","experience_level","employment_type") VALUES (?,?,?)' at line 1

Эта ошибка касается только первой таблицы, все таблицы имеют одинаковый тип
Я попытался перезаписать схему еще раз, используя приведение в pyspark, но это не сработало
это мой код для чтения из postgresql
def mariadb_loader():
try:
spark = SparkSession.builder \
.appName('mariadbloader') \
.config('spark.jars',
'/home/basel/main/Grad_proj/jars/mariadb-java-client-3.4.1.jar,/home/basel/main/Grad_proj/jars/postgresql-42.6.0.jar') \
.getOrCreate()

try:
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5423/airflow",
table='public.staging_data',
properties=psql_connection_properties
)
mariadb_dimensions_loader(df)

except Exception as e:
print(f"Error while reading from PostgreSQL or loading dimensions: {e}")

except Exception as e:
print(f"Error in mariadb_loader: {e}")

а это код для записи в mariadb
def mariadb_dimensions_loader(df):
try:
job_dim = df.select(
col("job_title").cast(StringType()),
col("experience_level").cast(StringType()),
col("employment_type").cast(StringType())
)

# Employee Dimension
employee_dim = df.select(col("employee_residence").cast(StringType()))

# Company Dimension
company_dim = df.select(
col("company_location").cast(StringType()),
col("company_size").cast(StringType()),
col("remote_ratio").cast(IntegerType())
)

# Currency Dimension
currency_dim = df.select(col("salary_currency").cast(StringType()))

tables = {
'job_dim': job_dim,
'employee_dim': employee_dim,
'company_dim': company_dim,
'currency_dim': currency_dim
}

for table_name ,data in tables.items():
try:
data.write.jdbc(
url="jdbc:mariadb://127.0.0.1:3306/main",
table=table_name,
mode='append',
properties=maria_connection_properties
)
except Exception as e:
print(f"Error while writing {table_name} to MariaDB: {e}")

except Exception as e:
print(f"Error in mariadb_dimensions_loader: {e}")


Подробнее здесь: https://stackoverflow.com/questions/790 ... ine-issues
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»