Spark Databricks: соединение Stream-Stream LeftOuter возвращает пустой результатPython

Программы на Python
Ответить Пред. темаСлед. тема
Гость
 Spark Databricks: соединение Stream-Stream LeftOuter возвращает пустой результат

Сообщение Гость »


Databricks, with Delta Live Tables, Spark 3.4

I have a streaming dataframe (let's call it "original") containing some records. I then filter this table based on some conditions, modify some column values and get a new "modified" dataframe.

I want to merge these two dataframes in a way that the records that are in "modified" replace the corresponding records in the "original". The approach I am taking right now is to "subtract" the modified dataframe from the original, and then union the result with the "modified" dataframe.

I have an ID field for each of the records.

I soon realized what I want to achieve can be done by either pyspark's subtract() function, or a left anti join. However, both of these are not supported if the right side dataframe is a streaming one. So, I tried to replicate a left-anti join with a left-outer join:

subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \ .where(modified['ID_mod'].isNull()).select(original['*']) However, then I got an error saying stream-stream left outer joins are only supported with watermarks and time range. So, following Spark's documentation, I did the following:

@dlt.table def final_records(): # origTime and modTime are two timestamp columns original = dlt.readStream("original_table").withWatermark('origTime', '2 hours') modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours') # Should give me original without modified records subtracted = original.join(modified, expr(""" ID = ID_mod AND modTime >= origTime AND modTime

Источник: https://stackoverflow.com/questions/779 ... pty-result
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»