Перехват исключения, возникшего в функции foreachBatchPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Перехват исключения, возникшего в функции foreachBatch

Сообщение Anonymous »

Я работаю над Databricks с помощью структурированной потоковой передачи Pyspark и хотел бы перехватить исключение, созданное мной в функции, переданной в поток как функция ".foreachBatch".
Это мой пример кода:

Код: Выделить всё

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQueryException

table_location = "hive_metastore.default.test_example"
checkpoint_location = "/tmp/test_checkpoint"

schema = StructType([
StructField("BestellID", IntegerType(), True),
StructField("CRDAT", StringType(), True),
StructField("Menge", IntegerType(), True),
StructField("__cmi_ingestion_ts", StringType(), True)
])

data = [(1, '20240901', 3, '20241002')]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable(table_location)

deltaTable = DeltaTable.forName(spark, table_location)

def mergetoDF(df, batchID):
raise ValueError("This is an error")

def test_run():

try:
inbound_data = (spark.readStream.format("delta").table(table_location))

streamQuery = (inbound_data
.writeStream
.format("delta")
.outputMode("append")
.foreachBatch(mergetoDF)
.trigger(once=True)
.option("checkpointLocation", checkpoint_location)
.start()
)
streamQuery.awaitTermination()
except ValueError:
print('I am a value error')
except StreamingQueryException as e:
if 'FOREACH_BATCH_USER_FUNCTION_ERROR' in str(e):
print('I am a StreamingQueryException')

test_run()
Я хотел бы иметь возможность перехватывать ошибки, возникающие внутри функции foreachBatch, без каких-либо дополнительных сообщений об ошибках, выводимых на экран. Но в настоящее время перехватывается исключение StreamingQueryException и сообщается о дополнительных ошибках

Код: Выделить всё

ERROR: Some streams terminated before this command could finish!
приводит к статусу сценария «Последнее выполнение не удалось».
На самом деле я хочу написать тесты pytest и ожидать исключений в тестах .

Подробнее здесь: https://stackoverflow.com/questions/792 ... h-function
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»