контекст < /strong> < /p>
- Бэкэнд: azure blob storage < /li>
libreries: deltalake 1.1.1 (pithon), 1 (1.1.1. LazyFrame/DataFrame) - Цель: idempotent upsert (повторное управление тем же входом не должен создавать новые строки)
Delta Table Shema
Код: Выделить всё
Schema(
[Field(area_type_code, PrimitiveType("string"), nullable=True),
Field(map_code, PrimitiveType("string"), nullable=True),
Field(fuel, PrimitiveType("string"), nullable=True),
Field(datetime, PrimitiveType("timestamp_ntz"), nullable=True),
Field(period_name, PrimitiveType("string"), nullable=True),
Field(period_granularity, PrimitiveType("string"), nullable=True),
Field(power, PrimitiveType("double"), nullable=True),
Field(energy, PrimitiveType("double"), nullable=True)]
)
[*] Разделитель источника на кусочки (я пробовал 2M и 10M строки).
Для каждого чанка перезагрузить таблицу дельта (так что вставки/обновления из предыдущих chunks is visible). → when_matched_update → when_not_matched_insert: < /li>
< /ul>
Код: Выделить всё
merge_results = delta_table.merge(
source=df_chunk,
predicate=merge_predicate,
source_alias='source',
target_alias='target',
writer_properties=writer_properties,
streamed_exec=True,
).when_matched_update(
predicate=match_predicate,
updates=update_mapping
).when_not_matched_insert(
updates=insert_mapping
).execute()
< /code>
Мои предикаты и отображения выглядят как: < /p>
Merge Predicate: target.area_type_code = source.area_type_code и target.map_code = source.map_code и target.fuel = source.fuel.dateTime = source.dateTime и target_gran_gran_grainder_graniod_gran_granistre source.period_granularity
Сопоставьте предикат: target.power! = Source.power или target.Energy! = Source.Energy
Сопоставление обновления: {'power': 'Source.power', 'Energy': 'Source.Energy'}
en orting at ets et in ets et in ets et in et en ortapping {'period_name': 'source.period_name', 'period_granularity': 'source.period_granularity', 'area_type_code': 'source.area_type_code', 'energy': 'source.energy', 'power': 'source.power', 'map_code': 'source.map_code', 'datetime': 'Source.DateTime', 'fuel': 'Source.fuel'}
Мое ограничение состоит в том, что предикат слияния определяет, существует ли запись в источнике или нет в цели; Предикат соответствия - это то, что решает, должна ли уже обновлять уже существующую запись или нет, и сопоставления в основном указывают, какие значения из источника должны в конечном итоге в конечном итоге, в каких столбцах из цели. В первый раз создается таблица Delta, а общее количество строк составляет 10 240,472. Это соответствует количеству строк в входном рамке DataFrame. Когда я запускаю его снова - те же исходные данные, без изменений - я вижу некоторые вставки в соответствии с словарем, нанесенным методом выполнения (Tablemerger). Это также соответствует количеству строк в таблице Delta после того, как я загружаю его в DataFrame и получаю количество строк. Я следит за тем, чтобы у меня не было никаких значений NULL или NAN во всех столбцах, используемых в предикате MERGE (то есть столбцы PK, если хотите). < /P>
'num_source_rows': 240472,
'num_target_rows_inserted': 29782,
'num_target_rows_updated': 4429,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 471766,
'num_output_rows': 505977,
'num_target_files_scanned': 21,
'num_target_files_skipped_during_scan': 0,
'num_target_files_added': 20,
'num_target_files_removed': 18,
< /code>
Я загружаю таблицу Delta в DataFrame Polars или Pandas, и я вижу дубликаты. Я даже зашел так далеко, что запрашивал и загружал строки для дублированного ключа и сравниваю значения для каждого столбца, и каждая строка и никаких различий не обнаружено. Являются ли:
Что-нибудь в моей логике слияния/совпадений за то, что они идентифицируют upsert? (например, нормализация/точность), которую я мог бы пропустить? Safeguard? Рад предоставить более подробную информацию.
Подробнее здесь: https://stackoverflow.com/questions/797 ... ake-python