Диагностика дублирующих вставки после слияния/upsert с Deltalake (Python)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Диагностика дублирующих вставки после слияния/upsert с Deltalake (Python)

Сообщение Anonymous »

Я бы очень признателен за вашу помощь с проблемой дублирования, которую я попадаю при использовании Deltalake Merges (Python). < /p>
контекст < /strong> < /p>
  • Бэкэнд: azure blob storage < /li>
    libreries: deltalake 1.1.1 (pithon), 1 (1.1.1. LazyFrame/DataFrame)
  • Цель: idempotent upsert (повторное управление тем же входом не должен создавать новые строки)

Delta Table Shema

Код: Выделить всё

Schema(
[Field(area_type_code, PrimitiveType("string"), nullable=True),
Field(map_code, PrimitiveType("string"), nullable=True),
Field(fuel, PrimitiveType("string"), nullable=True),
Field(datetime, PrimitiveType("timestamp_ntz"), nullable=True),
Field(period_name, PrimitiveType("string"), nullable=True),
Field(period_granularity, PrimitiveType("string"), nullable=True),
Field(power, PrimitiveType("double"), nullable=True),
Field(energy, PrimitiveType("double"), nullable=True)]
)
подход upsert (за кусок)

[*] Разделитель источника на кусочки (я пробовал 2M и 10M строки).
Для каждого чанка перезагрузить таблицу дельта (так что вставки/обновления из предыдущих chunks is visible). → when_matched_update → when_not_matched_insert: < /li>
< /ul>

Код: Выделить всё

merge_results = delta_table.merge(
source=df_chunk,
predicate=merge_predicate,
source_alias='source',
target_alias='target',
writer_properties=writer_properties,
streamed_exec=True,
).when_matched_update(
predicate=match_predicate,
updates=update_mapping
).when_not_matched_insert(
updates=insert_mapping
).execute()
< /code>
Мои предикаты и отображения выглядят как: < /p>
Merge Predicate: target.area_type_code = source.area_type_code и target.map_code = source.map_code и target.fuel = source.fuel.dateTime = source.dateTime и target_gran_gran_grainder_graniod_gran_granistre source.period_granularity 
Для предиката слияния я также пытался договориться о разделах, обнаруженных в кунке, например, и в Target.period_granulity в («часовой», «ежедневно») и target.area_type_code in ('bzn') . Значения поступают от различия чанка.
Сопоставьте предикат: target.power! = Source.power или target.Energy! = Source.Energy
Сопоставление обновления: {'power': 'Source.power', 'Energy': 'Source.Energy'}
en orting at ets et in ets et in ets et in et en ortapping {'period_name': 'source.period_name', 'period_granularity': 'source.period_granularity', 'area_type_code': 'source.area_type_code', 'energy': 'source.energy', 'power': 'source.power', 'map_code': 'source.map_code', 'datetime': 'Source.DateTime', 'fuel': 'Source.fuel'}
Мое ограничение состоит в том, что предикат слияния определяет, существует ли запись в источнике или нет в цели; Предикат соответствия - это то, что решает, должна ли уже обновлять уже существующую запись или нет, и сопоставления в основном указывают, какие значения из источника должны в конечном итоге в конечном итоге, в каких столбцах из цели. В первый раз создается таблица Delta, а общее количество строк составляет 10 240,472. Это соответствует количеству строк в входном рамке DataFrame. Когда я запускаю его снова - те же исходные данные, без изменений - я вижу некоторые вставки в соответствии с словарем, нанесенным методом выполнения (Tablemerger). Это также соответствует количеству строк в таблице Delta после того, как я загружаю его в DataFrame и получаю количество строк. Я следит за тем, чтобы у меня не было никаких значений NULL или NAN во всех столбцах, используемых в предикате MERGE (то есть столбцы PK, если хотите). < /P>
'num_source_rows': 240472,
'num_target_rows_inserted': 29782,
'num_target_rows_updated': 4429,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 471766,
'num_output_rows': 505977,
'num_target_files_scanned': 21,
'num_target_files_skipped_during_scan': 0,
'num_target_files_added': 20,
'num_target_files_removed': 18,
< /code>
Я загружаю таблицу Delta в DataFrame Polars или Pandas, и я вижу дубликаты. Я даже зашел так далеко, что запрашивал и загружал строки для дублированного ключа и сравниваю значения для каждого столбца, и каждая строка и никаких различий не обнаружено. Являются ли:
Что-нибудь в моей логике слияния/совпадений за то, что они идентифицируют upsert? (например, нормализация/точность), которую я мог бы пропустить? Safeguard? Рад предоставить более подробную информацию.

Подробнее здесь: https://stackoverflow.com/questions/797 ... ake-python
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Диагностика исключения IronPython StackOverflowException
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Диагностика памяти BenchmarkDotNet
    Anonymous » » в форуме C#
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Диагностика вызова блокировки SFTP в libssh2
    Anonymous » » в форуме C++
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • App Flutter Show Grey Screen в режиме выпуска, но отлично работает в режиме отладки «Диагностика Пропертиза »
    Anonymous » » в форуме Android
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Linqtodb Помощь, необходимая для слияния и вторичных вставки
    Anonymous » » в форуме C#
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»