Перехват исключения, возникшего в функции foreachBatchPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Перехват исключения, возникшего в функции foreachBatch

Сообщение Anonymous »

Я работаю над Databricks с помощью структурированной потоковой передачи Pyspark и хотел бы перехватить исключение, созданное мной в функции, переданной в поток как функция ".foreachBatch".
Это мой пример кода:

Код: Выделить всё

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.streaming import StreamingQueryException

table_location = "hive_metastore.default.test_example"
checkpoint_location = "/tmp/test_checkpoint"

schema = StructType([
StructField("BestellID", IntegerType(), True),
StructField("CRDAT", StringType(), True),
StructField("Menge", IntegerType(), True),
StructField("__cmi_ingestion_ts", StringType(), True)
])

data = [(1, '20240901', 3, '20241002')]
df = spark.createDataFrame(data, schema)
df.write.mode("overwrite").saveAsTable(table_location)

deltaTable = DeltaTable.forName(spark, table_location)

def mergetoDF(df, batchID):
raise ValueError("This is an error")

def test_run():

try:
inbound_data = (spark.readStream.format("delta").table(table_location))

streamQuery = (inbound_data
.writeStream
.format("delta")
.outputMode("append")
.foreachBatch(mergetoDF)
.trigger(once=True)
.option("checkpointLocation", checkpoint_location)
.start()
)
streamQuery.awaitTermination()
except ValueError:
print('I am a value error')
except StreamingQueryException as e:
if 'FOREACH_BATCH_USER_FUNCTION_ERROR' in str(e):
print('I am a StreamingQueryException')

test_run()
Я хотел бы иметь возможность перехватывать ошибки, возникающие внутри функции foreachBatch, без каких-либо дополнительных сообщений об ошибках, выводимых на экран. Но в настоящее время перехватывается исключение StreamingQueryException и сообщается о дополнительных ошибках

Код: Выделить всё

ERROR: Some streams terminated before this command could finish!
org.apache.spark.api.python.PythonException: Found error inside foreachBatch Python process: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 99, in main
process(df_ref_id, int(batch_id))
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 86, in process
func(batch_df, batch_id)
File "/home/spark-19129aeb-c024-45c4-ac47-ef/.ipykernel/54200/command-3034895255467445-2795320295", line 7, in mergetoDF
ValueError:  T h i s   i s   a n   e r r o r < b r   / > < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . c o n n e c t . p l a n n e r . S t r e a m i n g F o r e a c h B a t c h H e l p e r $ . $ a n o n f u n $ p y t h o n F o r e a c h B a t c h W r a p p e r $ 6 ( S t r e a m i n g F o r e a c h B a t c h H e l p e r . s c a l a : 1 9 9 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . c o n n e c t . p l a n n e r . S t r e a m i n g F o r e a c h B a t c h H e l p e r $ . $ a n o n f u n $ p y t h o n F o r e a c h B a t c h W r a p p e r $ 6 $ a d a p t e d ( S t r e a m i n g F o r e a c h B a t c h H e l p e r . s c a l a : 1 7 4 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . c o n n e c t . p l a n n e r . S t r e a m i n g F o r e a c h B a t c h H e l p e r $ . $ a n o n f u n $ d a t a F r a m e C a c h i n g W r a p p e r $ 1 ( S t r e a m i n g F o r e a c h B a t c h H e l p e r . s c a l a : 9 1 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . c o n n e c t . p l a n n e r . S t r e a m i n g F o r e a c h B a t c h H e l p e r $ . $ a n o n f u n $ d a t a F r a m e C a c h i n g W r a p p e r $ 1 $ a d a p t e d ( S t r e a m i n g F o r e a c h B a t c h H e l p e r . s c a l a : 8 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . c a l l B a t c h W r i t e r ( F o r e a c h B a t c h S i n k . s c a l a : 1 7 2 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . $ a n o n f u n $ a d d B a t c h O p t i m i z e d $ 2 ( F o r e a c h B a t c h S i n k . s c a l a : 2 4 7 ) < b r   / >         a t   s c a l a . r u n t i m e . j a v a 8 . J F u n c t i o n 0 $ m c V $ s p . a p p l y ( J F u n c t i o n 0 $ m c V $ s p . j a v a : 2 3 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . r u n W i t h A Q E ( F o r e a c h B a t c h S i n k . s c a l a : 1 9 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . a d d B a t c h O p t i m i z e d ( F o r e a c h B a t c h S i n k . s c a l a : 2 4 7 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . $ a n o n f u n $ a d d B a t c h $ 2 ( F o r e a c h B a t c h S i n k . s c a l a : 9 9 ) < b r   / >         a t   s c a l a . r u n t i m e . j a v a 8 . J F u n c t i o n 0 $ m c V $ s p . a p p l y ( J F u n c t i o n 0 $ m c V $ s p . j a v a : 2 3 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . u t i l . U t i l s $ . t i m e T a k e n M s ( U t i l s . s c a l a : 5 3 7 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . s o u r c e s . F o r e a c h B a t c h S i n k . a d d B a t c h ( F o r e a c h B a t c h S i n k . s c a l a : 9 1 ) < b r   / > a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . a d d B a t c h ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 2 3 8 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ r u n B a t c h $ 1 8 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 4 5 6 ) < b r   / >         a t   s c a l a . r u n t i m e . j a v a 8 . J F u n c t i o n 0 $ m c V $ s p . a p p l y ( J F u n c t i o n 0 $ m c V $ s p . j a v a : 2 3 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . P r o g r e s s C o n t e x t . r e p o r t T i m e T a k e n ( P r o g r e s s R e p o r t e r . s c a l a : 3 2 5 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . m a r k A n d T i m e C o l l e c t B a t c h ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 2 4 6 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ r u n B a t c h $ 1 7 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 4 5 6 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . S Q L E x e c u t i o n $ . $ a n o n f u n $ w i t h N e w E x e c u t i o n I d 0 $ 1 0 ( S Q L E x e c u t i o n . s c a l a : 4 6 2 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . S Q L E x e c u t i o n $ . w i t h S Q L C o n f P r o p a g a t e d ( S Q L E x e c u t i o n . s c a l a : 8 0 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . S Q L E x e c u t i o n $ . $ a n o n f u n $ w i t h N e w E x e c u t i o n I d 0 $ 1 ( S Q L E x e c u t i o n . s c a l a : 3 3 4 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . S p a r k S e s s i o n . w i t h A c t i v e ( S p a r k S e s s i o n . s c a l a : 1 1 8 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . S Q L E x e c u t i o n $ . w i t h N e w E x e c u t i o n I d 0 ( S Q L E x e c u t i o n . s c a l a : 2 0 5 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . S Q L E x e c u t i o n $ . w i t h N e w E x e c u t i o n I d ( S Q L E x e c u t i o n . s c a l a : 7 3 7 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ r u n B a t c h $ 1 6 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 4 4 9 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . P r o g r e s s C o n t e x t . r e p o r t T i m e T a k e n ( P r o g r e s s R e p o r t e r . s c a l a : 3 2 5 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . r u n B a t c h ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 4 4 9 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ e x e c u t e O n e B a t c h $ 6 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 7 6 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . h a n d l e D a t a S o u r c e E x c e p t i o n ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 8 6 8 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ e x e c u t e O n e B a t c h $ 5 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 7 6 0 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . w i t h S c h e m a E v o l u t i o n ( M i c r o B a t c h E x e c u t i o n . s c a l a : 1 8 1 3 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ e x e c u t e O n e B a t c h $ 4 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 7 5 6 ) < b r   / >         a t   s c a l a . r u n t i m e . j a v a 8 . J F u n c t i o n 0 $ m c V $ s p . a p p l y ( J F u n c t i o n 0 $ m c V $ s p . j a v a : 2 3 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . P r o g r e s s C o n t e x t . r e p o r t T i m e T a k e n ( P r o g r e s s R e p o r t e r . s c a l a : 3 2 5 ) < b r   / >         a t   o r g . a p a c h e . s p a r k . s q l . e x e c u t i o n . s t r e a m i n g . M i c r o B a t c h E x e c u t i o n . $ a n o n f u n $ e x e c u t e O n e B a t c h $ 3 ( M i c r o B a t c h E x e c u t i o n . s c a l a : 7 1 6 ) < b r   / >         a t   s c a l a . r u n t i m e . j a v a 8 . J F u n c t i o n 0 $ m c V $ s p . a p p l y ( J F u n c t i o n 0 $ m c V $ s p . j a v a : 2 3 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t T r a c i n g . $ a n o n f u n $ w i t h A t t r i b u t i o n C o n t e x t $ 1 ( A t t r i b u t i o n C o n t e x t T r a c i n g . s c a l a : 4 8 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t $ . $ a n o n f u n $ w i t h V a l u e $ 1 ( A t t r i b u t i o n C o n t e x t . s c a l a : 2 7 6 ) < b r   / >         a t   s c a l a . u t i l . D y n a m i c V a r i a b l e . w i t h V a l u e ( D y n a m i c V a r i a b l e . s c a l a : 6 2 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t $ . w i t h V a l u e ( A t t r i b u t i o n C o n t e x t . s c a l a : 2 7 2 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t T r a c i n g . w i t h A t t r i b u t i o n C o n t e x t ( A t t r i b u t i o n C o n t e x t T r a c i n g . s c a l a : 4 6 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t T r a c i n g . w i t h A t t r i b u t i o n C o n t e x t $ ( A t t r i b u t i o n C o n t e x t T r a c i n g . s c a l a : 4 3 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . P u b l i c D B L o g g i n g . w i t h A t t r i b u t i o n C o n t e x t ( D a t a b r i c k s S p a r k U s a g e L o g g e r . s c a l a : 2 7 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t T r a c i n g . w i t h A t t r i b u t i o n T a g s ( A t t r i b u t i o n C o n t e x t T r a c i n g . s c a l a : 9 5 ) < b r   / >         a t   c o m . d a t a b r i c k s . l o g g i n g . A t t r i b u t i o n C o n t e x t T r a c i n g . w i t h A t t r i b u t i o n T a g s $ ( A t t r i b u t i o n C o n t e x t T r a c i n g . s c a l a : 7 6 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . P u b l i c D B L o g g i n g . w i t h A t t r i b u t i o n T a g s ( D a t a b r i c k s S p a r k U s a g e L o g g e r . s c a l a : 2 7 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . P u b l i c D B L o g g i n g . w i t h A t t r i b u t i o n T a g s 0 ( D a t a b r i c k s S p a r k U s a g e L o g g e r . s c a l a : 7 4 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . D a t a b r i c k s S p a r k U s a g e L o g g e r . w i t h A t t r i b u t i o n T a g s ( D a t a b r i c k s S p a r k U s a g e L o g g e r . s c a l a : 1 7 5 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . U s a g e L o g g i n g . $ a n o n f u n $ w i t h A t t r i b u t i o n T a g s $ 1 ( U s a g e L o g g e r . s c a l a : 6 1 7 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . U s a g e L o g g i n g $ . w i t h A t t r i b u t i o n T a g s ( U s a g e L o g g e r . s c a l a : 7 2 9 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . U s a g e L o g g i n g $ . w i t h A t t r i b u t i o n T a g s ( U s a g e L o g g e r . s c a l a : 7 3 8 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . U s a g e L o g g i n g . w i t h A t t r i b u t i o n T a g s ( U s a g e L o g g e r . s c a l a : 6 1 7 ) < b r   / >         a t   c o m . d a t a b r i c k s . s p a r k . u t i l . U s a g e L o g g i n g . w i t h A t t r i b u t i o n T a g s $ ( U s a g e L o g g e r . s c a la:615)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:710)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:85)
at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:73)
at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.runOneBatch(TriggerExecutor.scala:97)
at org.apache.spark.sql.execution.streaming.SingleBatchExecutor.execute(TriggerExecutor.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:671)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:454)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:401)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:74)
at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:175)
at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:729)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:738)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:617)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:615)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:86)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:381)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:283)
что приводит к статусу сценария «Последнее выполнение не удалось».
На самом деле я хочу написать тесты pytest и ожидать исключений в тестах.>

Подробнее здесь: https://stackoverflow.com/questions/792 ... h-function
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»