Я пытаюсь интегрировать Spark Connect (spark-connect) в свои искровые задания. Выдает ошибку циклического импорта при имPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Я пытаюсь интегрировать Spark Connect (spark-connect) в свои искровые задания. Выдает ошибку циклического импорта при им

Сообщение Anonymous »

Я пытаюсь интегрировать Spark Connect(spark-connect) в свои задания Spark. Для запуска заданий в фоновом режиме я использую сельдерей в сочетании с eventlet с параллелизмом 5. Это отлично работает для кластера Spark, который настроен локально и имеет главный и один рабочий узел.
spark = SparkSession.builder \
.master(settings.SPARK_MASTER) \
.appName(self.__app_name)

Это отлично работает внутри сельдерея.
Я пытался интегрировать Spark Connect в существующие задания
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").getOrCreate()

Это не удается при запуске внутри celery с пулом событий, но работает с одиночным пулом. Я включил журнал отладки для pyspark и обнаружил частичную ошибку импорта, из-за которой мои задания не выполняются
ERROR/MainProcess] Task conn.union[afd002b6-39ca-4c51-b017-2ffd06f51c35] raised unexpected: ImportError("cannot import name 'SparkSession' from partially initialized module 'pyspark.sql.connect.session' (most likely due to a circular import) (C:\\SCAD\\ifp-dataprep\\tenv\\Lib\\site-packages\\pyspark\\sql\\connect\\session.py)")
Traceback (most recent call last):
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\celery\app\trace.py", line 477, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\celery\app\trace.py", line 760, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\django\conn.py", line 26, in union
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").create()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\pyspark\sql\session.py", line 524, in create
from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
ImportError: cannot import name 'SparkSession' from partially initialized module 'pyspark.sql.connect.session' (most likely due to a circular import) (C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\pyspark\sql\connect\session.py)

Что вызывает эту проблему? Это из-за параллелизма? Как я могу это исправить?
import os
os.environ['PYSPARK_LOG_LEVEL'] = 'DEBUG'
from functools import reduce
from pyspark.sql.functions import lit

from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379/0')

from pyspark.sql import SparkSession

@app.task
def union():
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").create()
adf = spark.read.parquet("s3a://dataprep/datasets/0b1018bd-5558-4932-b98a-46996a97663d.parquet",inferSchema=True)
bdf = spark.read.parquet("s3a://dataprep/datasets/f6bdd428-fb5e-4942-b236-1c8efcc1d2d3.parquet",inferSchema=True)
adf.show()
bdf.count()
common_columns = list(set(adf.columns) & set(bdf.columns))

# Add missing columns to df1 filled with null values
for col in set(bdf.columns) - set(adf.columns):
adf = adf.withColumn(col, lit(None))

# Add missing columns to df2 filled with null values
for col in set(adf.columns) - set(bdf.columns):
bdf = bdf.withColumn(col, lit(None))

# Select only the common columns from each DataFrame
df1_common = adf.select(common_columns)
df2_common = bdf.select(common_columns)

# Union the DataFrames
# result = df1_common.union(df2_common)
dfs = [df1_common,df2_common]
result = reduce(lambda df1, df2: df1.union(df2), dfs)
result.show()


Подробнее здесь: https://stackoverflow.com/questions/784 ... bs-it-thro
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»