Я пытаюсь интегрировать Spark Connect(spark-connect) в свои задания Spark. Для запуска заданий в фоновом режиме я использую сельдерей в сочетании с eventlet с параллелизмом 5. Это отлично работает для кластера Spark, который настроен локально и имеет главный и один рабочий узел.
spark = SparkSession.builder \
.master(settings.SPARK_MASTER) \
.appName(self.__app_name)
Это отлично работает внутри сельдерея.
Я пытался интегрировать Spark Connect в существующие задания
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").getOrCreate()
Это не удается при запуске внутри celery с пулом событий, но работает с одиночным пулом. Я включил журнал отладки для pyspark и обнаружил частичную ошибку импорта, из-за которой мои задания не выполняются
ERROR/MainProcess] Task conn.union[afd002b6-39ca-4c51-b017-2ffd06f51c35] raised unexpected: ImportError("cannot import name 'SparkSession' from partially initialized module 'pyspark.sql.connect.session' (most likely due to a circular import) (C:\\SCAD\\ifp-dataprep\\tenv\\Lib\\site-packages\\pyspark\\sql\\connect\\session.py)")
Traceback (most recent call last):
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\celery\app\trace.py", line 477, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\celery\app\trace.py", line 760, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\django\conn.py", line 26, in union
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").create()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\pyspark\sql\session.py", line 524, in create
from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
ImportError: cannot import name 'SparkSession' from partially initialized module 'pyspark.sql.connect.session' (most likely due to a circular import) (C:\SCAD\ifp-dataprep\tenv\Lib\site-packages\pyspark\sql\connect\session.py)
Что вызывает эту проблему? Это из-за параллелизма? Как я могу это исправить?
import os
os.environ['PYSPARK_LOG_LEVEL'] = 'DEBUG'
from functools import reduce
from pyspark.sql.functions import lit
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
from pyspark.sql import SparkSession
@app.task
def union():
spark = SparkSession.builder.remote("sc://xx.x.xxx.71:15002").create()
adf = spark.read.parquet("s3a://dataprep/datasets/0b1018bd-5558-4932-b98a-46996a97663d.parquet",inferSchema=True)
bdf = spark.read.parquet("s3a://dataprep/datasets/f6bdd428-fb5e-4942-b236-1c8efcc1d2d3.parquet",inferSchema=True)
adf.show()
bdf.count()
common_columns = list(set(adf.columns) & set(bdf.columns))
# Add missing columns to df1 filled with null values
for col in set(bdf.columns) - set(adf.columns):
adf = adf.withColumn(col, lit(None))
# Add missing columns to df2 filled with null values
for col in set(adf.columns) - set(bdf.columns):
bdf = bdf.withColumn(col, lit(None))
# Select only the common columns from each DataFrame
df1_common = adf.select(common_columns)
df2_common = bdf.select(common_columns)
# Union the DataFrames
# result = df1_common.union(df2_common)
dfs = [df1_common,df2_common]
result = reduce(lambda df1, df2: df1.union(df2), dfs)
result.show()
Подробнее здесь: https://stackoverflow.com/questions/784 ... bs-it-thro
Я пытаюсь интегрировать Spark Connect (spark-connect) в свои искровые задания. Выдает ошибку циклического импорта при им ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Что происходит при использовании взаимного или циклического (циклического) импорта?
Anonymous » » в форуме Python - 0 Ответы
- 20 Просмотры
-
Последнее сообщение Anonymous
-