Это мой пример кода:
Код: Выделить всё
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQueryException
table_location = "hive_metastore.default.test_example"
checkpoint_location = "/tmp/test_checkpoint"
schema = StructType([
StructField("BestellID", IntegerType(), True),
StructField("CRDAT", StringType(), True),
StructField("Menge", IntegerType(), True),
StructField("__cmi_ingestion_ts", StringType(), True)
])
data = [(1, '20240901', 3, '20241002')]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable(table_location)
deltaTable = DeltaTable.forName(spark, table_location)
def mergetoDF(df, batchID):
raise ValueError("This is an error")
def test_run():
try:
inbound_data = (spark.readStream.format("delta").table(table_location))
streamQuery = (inbound_data
.writeStream
.format("delta")
.outputMode("append")
.foreachBatch(mergetoDF)
.trigger(once=True)
.option("checkpointLocation", checkpoint_location)
.start()
)
streamQuery.awaitTermination()
except ValueError:
print('I am a value error')
except StreamingQueryException as e:
if 'FOREACH_BATCH_USER_FUNCTION_ERROR' in str(e):
print('I am a StreamingQueryException')
test_run()
Код: Выделить всё
ERROR: Some streams terminated before this command could finish!
На самом деле я хочу написать тесты pytest и ожидать исключений в тестах .
Подробнее здесь: https://stackoverflow.com/questions/792 ... h-function