Databricks не может найти файл csv внутри колеса, которое я установил при запуске из блокнота Databricks.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Databricks не может найти файл csv внутри колеса, которое я установил при запуске из блокнота Databricks.

Сообщение Anonymous »

Я изучаю Spark, поэтому в качестве задачи нам нужно было создать колесо локально, а затем установить его в Databricks (я использую Azure Databricks) и протестировать его, запустив из блокнота Databrick. Эта программа предполагает чтение файла CSV (timezones.csv), включенного в файл колеса. Файл находится внутри колеса (я проверил), а также колесо работает правильно, когда я устанавливаю его и запускаю с локального ПК Jupyter Notebook. Однако, когда я устанавливаю его в блокнот Databricks, он выдает эту ошибку, как вы можете видеть ниже на снимке:
[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File , line 7
3 from pyspark.sql import SparkSession
5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
30 # las 3 columnas quedarán con valor nulo (NULL)
32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function..wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
828 if type(path) == list:
829 assert self._spark._sc._jvm is not None
--> 830 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
831 elif isinstance(path, RDD):
833 def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception..deco(*a, **kw)
226 converted = convert_exception(e.java_exception)
227 if not isinstance(converted, UnknownException):
228 # Hide where the exception came from that shows a non-Pythonic
229 # JVM exception message.
--> 230 raise converted from None
231 else:
232 raise

Снимок ошибки Databricks 1:
Изображение
Снимок ошибки Databricks 2:
Изображение

Кто-нибудь сталкивался с этой проблемой раньше? Есть ли какое-нибудь решение?
Я пробовал установить файл как с помощью pip, так и из библиотеки, и получил ту же ошибку, также несколько раз перезагружал кластер.
Заранее благодарим за помощь.
Я использую Python 3.11, Pyspark 3.5 и Java 8 и создал колесо локально из PyCharm. Если для ответа вам нужна дополнительная информация, просто спросите, и я предоставлю ее.
Все детали я объяснил выше. Я ожидал, что смогу использовать колесо, которое я создал локально из блокнота Databricks.
Извините, что мой английский не является моим родным языком, и я немного заржавел.Отредактировано для ответа на комментарий:

Можете ли вы перейти к %sh ls
/dbfs/local_disk0/.ephemeral_nfs/cluster_libraries/ python/lib/python3.10/site-packages/motor_ingesta/resources и поделиться содержимым папки как изображением? – Самуэль Демир 11 часов назад

Я просто сделал то, что вы просили, и получил этот результат (файл на самом деле существует, даже если Databricks говорит, что это возможно» не нашел...
Снимок результата предложения
Отредактировано в соответствии с ответом Самуэля Демира:
Может быть, package_data отсутствует при инициализации вашей установки?!
Это то, что я делаю, чтобы добавить свои файлы конфигурации в папку res в
колесо. После этого файлы будут доступны точно по тому же фрагменту кода,
вашему.
setup(
name="daprep",
version=__version__,
author="",
author_email="[email protected]",
description="A short summary of the project",
license="proprietary",
url="",
packages=find_packages("src"),
package_dir={"": "src"},
package_data={"daprep": ["res/**/*"]},
long_description=read("README.md"),
install_requires=read_requirements(Path("requirements.txt")),
tests_require=[
"pytest",
"pytest-cov",
"pre-commit",
],
cmdclass={
"dist": DistCommand,
"test": TestCommand,
"testcov": TestCovCommand,
},
platforms="any",
python_requires=">=3.7",
entry_points={
"console_scripts": [
"main_entrypoint = daprep.main:main_entrypoint",
]
}, )


Мой файл setup.py также содержит строку package _data, здесь вы можете увидеть снимок и код для него. Улавливаете ли вы какие-нибудь другие детали, которые могут иметь здесь отношение? Заранее спасибо.
Снимок моего файла setup.py
from setuptools import setup, find_packages

setup(
name="motor-ingesta",
version="0.1.0",
author="Estrella Adriana Sicardi Segade",
author_email="[email protected]",
description="Motor de ingesta para el curso de Spark",
long_description="Motor de ingesta para el curso de Spark",
long_description_content_type="text/markdown",
url="https://github.com/esicardi",
python_requires=">=3.8",
packages=find_packages(),
package_data={"motor_ingesta": ["resources/*.csv"]})

Отредактировано, чтобы ответить на вопросы:

Можете ли вы загрузить CSV-файл, просто прочитав его в блокнотеклетка? – Самуэль Демир 17 часов назад

Да, я могу прочитать файл вне кода.
Например, я могу распечатать содержимое файла:
%sh cat /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv

"iata_code","iana_tz","windows_tz"
"AAA","Pacific/Tahiti","Hawaiian Standard Time"
"AAB","Australia/Brisbane","E. Australia Standard Time"
"AAC","Africa/Cairo","Egypt Standard Time"
"AAD","Africa/Mogadishu","E. Africa Standard Time"
"AAE","Africa/Algiers","W. Central Africa Standard Time"
"AAF","America/New_York","Eastern Standard Time"
"AAG","America/Sao_Paulo","E. South America Standard Time"
"AAH","Europe/Berlin","W. Europe Standard Time"
"AAI","America/Araguaina","Tocantins Standard Time"
"AAJ","America/Paramaribo","SA Eastern Standard Time"
"AAK","Pacific/Tarawa","UTC+12"
"AAL","Europe/Copenhagen","Romance Standard Time"
"AAM","Africa/Johannesburg","South Africa Standard Time"
"AAN","Asia/Dubai","Arabian Standard Time"
"AAO","America/Caracas","Venezuela Standard Time"
"AAP","Asia/Makassar","Singapore Standard Time"
"AAQ","Europe/Moscow","Russian Standard Time"
"AAR","Europe/Copenhagen","Romance Standard Time"
"AAS","Asia/Jayapura","Tokyo Standard Time"

...
[file continues on until the end]


Можете ли вы поделиться частью своей кодовой базы, чтобы реконструировать
проблему? – Самуэль Демир, 17 часов назад

Что касается пакета (называемого Motor_ingesta), он состоит из трех файлов py: Motor_ingesta.py, agregaciones.py и flujo_diario. .py
Пакет принимает и обрабатывает некоторые файлы JSON с информацией о рейсах в аэропортах США в январе 2023 года. Функция, считывающая timezones.csv, определена внутри agregaciones.py. Функция принимает фрейм данных (эта функция используется для обработки фреймов данных, полученных ранее из файлов JSON), и объединяет его с данными из timezones.csv, сопоставляя коды аэропортов. Позже он использует эту информацию для расчета времени UTC на основе времени отправления (DepTime) и зон UTC. Он добавляет еще один столбец справа от Dataframe, «FlightTime», содержащий время UTC, а затем удаляет добавленные столбцы из timezones.csv. Вот код функции:
def aniade_hora_utc(spark: SparkSession, df: DF) -> DF:
"""
Añade la columna FlightTime en formato UTC al DataFrame de vuelos.

:param spark: Sesión de Spark.
:param df: DataFrame de vuelos.
:return: DataFrame de vuelos con la columna FlightTime en formato UTC.
"""

path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
df_with_flight_time = df_with_tz.withColumn("FlightTime", to_utc_timestamp(concat(
col("FlightDate"), lit(" "),
lpad(col("DepTime").cast("string"), 4, "0").substr(1, 2), lit(":"),
col("DepTime").cast("string").substr(-2, 2)
), col("iana_tz")))

df_with_flight_time = df_with_flight_time.drop("iata_code", "iana_tz", "windows_tz")
return df_with_flight_time


Подробнее здесь: https://stackoverflow.com/questions/782 ... ing-from-a
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»