Spark Databricks: соединение Stream-Stream LeftOuter возвращает пустой результат ⇐ Python
Spark Databricks: соединение Stream-Stream LeftOuter возвращает пустой результат
Databricks, with Delta Live Tables, Spark 3.4
I have a streaming dataframe (let's call it "original") containing some records. I then filter this table based on some conditions, modify some column values and get a new "modified" dataframe.
I want to merge these two dataframes in a way that the records that are in "modified" replace the corresponding records in the "original". The approach I am taking right now is to "subtract" the modified dataframe from the original, and then union the result with the "modified" dataframe.
I have an ID field for each of the records.
I soon realized what I want to achieve can be done by either pyspark's subtract() function, or a left anti join. However, both of these are not supported if the right side dataframe is a streaming one. So, I tried to replicate a left-anti join with a left-outer join:
subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \ .where(modified['ID_mod'].isNull()).select(original['*']) However, then I got an error saying stream-stream left outer joins are only supported with watermarks and time range. So, following Spark's documentation, I did the following:
@dlt.table def final_records(): # origTime and modTime are two timestamp columns original = dlt.readStream("original_table").withWatermark('origTime', '2 hours') modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours') # Should give me original without modified records subtracted = original.join(modified, expr(""" ID = ID_mod AND modTime >= origTime AND modTime
Источник: https://stackoverflow.com/questions/779 ... pty-result
Databricks, with Delta Live Tables, Spark 3.4
I have a streaming dataframe (let's call it "original") containing some records. I then filter this table based on some conditions, modify some column values and get a new "modified" dataframe.
I want to merge these two dataframes in a way that the records that are in "modified" replace the corresponding records in the "original". The approach I am taking right now is to "subtract" the modified dataframe from the original, and then union the result with the "modified" dataframe.
I have an ID field for each of the records.
I soon realized what I want to achieve can be done by either pyspark's subtract() function, or a left anti join. However, both of these are not supported if the right side dataframe is a streaming one. So, I tried to replicate a left-anti join with a left-outer join:
subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \ .where(modified['ID_mod'].isNull()).select(original['*']) However, then I got an error saying stream-stream left outer joins are only supported with watermarks and time range. So, following Spark's documentation, I did the following:
@dlt.table def final_records(): # origTime and modTime are two timestamp columns original = dlt.readStream("original_table").withWatermark('origTime', '2 hours') modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours') # Should give me original without modified records subtracted = original.join(modified, expr(""" ID = ID_mod AND modTime >= origTime AND modTime
Источник: https://stackoverflow.com/questions/779 ... pty-result
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение