Гармонизация данных, распределенных по нескольким строкам с повторяющимися значениями для некоторых ключей.Python

Программы на Python
Ответить
Anonymous
 Гармонизация данных, распределенных по нескольким строкам с повторяющимися значениями для некоторых ключей.

Сообщение Anonymous »


Я сталкиваюсь с тем, что данные разбросаны по нескольким столбцам.

Некоторые строки содержат повторяющиеся данные для ключа. Некоторые семантически схожие данные Другие новые свежие данные (иногда только для разных ключей, но иногда и для одного и того же ключа)

Какая стратегия могла бы стать хорошей, чтобы сделать эти данные доступными? Я хочу иметь одну строку для ключа.

Я пытаюсь работать с данными о компаниях. Это из ОРБИСа. Для одного первичного ключа несколько строк содержат значения, которые его описывают.

В некоторых случаях речь идет о кодах и классификациях NACE, в других случаях (как показано ниже) речь идет об идентификационных номерах (налоговых и других).

Каков отличный способ гармонизировать данные (в несколько общем виде)?

Я изучил:
[*]collect_list(struct(для каждого негруппируемого ключевого столбца)) [*]array_compact(collect_set) (для каждого столбца-негруппирующего ключа) [*]объединение первой агрегации с пользовательской функцией, которая JSON обрабатывает данные и пытается сопоставить схожие значения. Однако сохраняется только первое вхождение ключа (а иногда мне нужно несколько).
Ниже вы найдете код воспроизводимого примера

В pandas структура выглядит следующим образом:

импортировать панд как pd d = pd.DataFrame( [ { "bv_d_id_number": "XX00000000000", "national_id_number": "XX00000000000", "national_id_label": "Европейский номер плательщика НДС", "national_id_type": "Европейский номер плательщика НДС", «номер_торгового_регистра»: Нет, "vat_per_tax_number": Нет, "european_vat_number": Нет, "lei_legal_entity_identifier": Нет, «статистический_номер»: нет, «other_company_id_number»: Нет, «ip_identification_number»: Нет, «ip_identification_label»: Нет, "_13": Нет, "ticker_symbol": Нет, "isin_number": Нет }, { "bv_d_id_number": "XX00000000000", "national_id_number": "00000000000", "national_id_label": "ИНН", "national_id_type": "ИНН", «номер_торгового_регистра»: Нет, "vat_per_tax_number": Нет, "european_vat_number": Нет, "lei_legal_entity_identifier": Нет, «статистический_номер»: нет, «other_company_id_number»: Нет, «ip_identification_number»: Нет, «ip_identification_label»: Нет, "_13": Нет, "ticker_symbol": Нет, "isin_number": Нет }, { "bv_d_id_number": "XX00000000000", "national_id_number": "AA0000000", "national_id_label": "Номер CCIAA", "national_id_type": "Номер торгового реестра", "trade_register_number": "AA0000000", "vat_per_tax_number": "00000000000", "european_vat_number": "XX00000000000", "lei_legal_entity_identifier": Нет, «статистический_номер»: нет, «other_company_id_number»: Нет, «ip_identification_number»: Нет, «ip_identification_label»: Нет, "_13": Нет, "ticker_symbol": Нет, "isin_number": Нет }, { "bv_d_id_number": "XX00000000000", "national_id_number": "00000000000", "national_id_label": "Фискальный кодекс", "national_id_type": "Номер НДС/налога", «номер_торгового_регистра»: Нет, "vat_per_tax_number": "00000000000", "european_vat_number": Нет, "lei_legal_entity_identifier": Нет, «статистический_номер»: нет, «other_company_id_number»: Нет, «ip_identification_number»: Нет, «ip_identification_label»: Нет, "_13": Нет, "ticker_symbol": Нет, "isin_number": Нет }, { "bv_d_id_number": "XX00000000000", "national_id_number": "00000000000", "national_id_label": "Партита IVA", "national_id_type": "Номер НДС/налога", «номер_торгового_регистра»: Нет, "vat_per_tax_number": Нет, "european_vat_number": Нет, "lei_legal_entity_identifier": Нет, «статистический_номер»: нет, «other_company_id_number»: Нет, «ip_identification_number»: Нет, «ip_identification_label»: Нет, "_13": Нет, "ticker_symbol": Нет, "isin_number": Нет } ] ) Функции очистки приведены ниже:

из pyspark.sql import SparkSession из функций импорта pyspark.sql как F из pyspark.sql импортировать DataFrame из pyspark.sql.functions импортировать UDF spark = SparkSession.builder.appName("orbis").master("local[2]").getOrCreate() sdf = spark.createDataFrame(dxx.fillna(-1).fillna('NULL')) Collect_as_struct_list(sdf, grouping_keys=["bv_d_id_number"], mode="context").show() корень |-- bv_d_id_number: строка (обнуляемое = true) |-- значения_столбца: массив (обнуляемый = ложь) | |-- элемент: структура (содержитNull = false) | | |-- national_id_number: строка (обнуляемое = true) | | |-- national_id_label: строка (обнуляемое = true) | | |-- national_id_type: строка (обнуляемое = true) | | |-- номер_торгового_регистра: строка (обнуляемое = true) | | |-- vat_per_tax_number: строка (обнуляемое = true) | | |-- european_vat_number: строка (обнуляемое = true) | | |-- lei_legal_entity_identifier: строка (обнуляемая = истина) | | |-- статистический_номер: строка (обнуляемое = true) | | |--other_company_id_number: строка (обнуляемое = true) | | |-- ip_identification_number: строка (обнуляемое = true) | | |-- ip_identification_label: строка (обнуляемое = true) | | |-- _13: логическое значение (обнуляемое = true) | | |--ticker_symbol: строка (обнуляемое = true) | | |-- isin_number: строка (обнуляемое = true) Collect_as_struct_list(sdf, grouping_keys=["bv_d_id_number"], mode="individual_set").show() корень |-- bv_d_id_number: строка (обнуляемое = true) |-- national_id_number: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- national_id_label: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- national_id_type: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- номер_торгового_регистра: массив (обнуляемый = ложь) | |-- элемент: строка (содержитNull = false) |-- vat_per_tax_number: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- европейский_ват_номер: массив (обнуляемый = ложь) | |-- элемент: строка (содержитNull = false) |-- lei_legal_entity_identifier: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- статистический_номер: массив (обнуляемый = ложь) | |-- элемент: строка (содержитNull = false) |--other_company_id_number: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- ip_identification_number: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- ip_identification_label: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) |-- _13: массив (обнуляемый = ложь) | |-- элемент: логическое значение (содержитNull = false) |-- тикер_символ: массив (обнуляемый = ложь) | |-- элемент: строка (содержитNull = false) |-- isin_number: массив (обнуляемый = false) | |-- элемент: строка (содержитNull = false) защита Collect_as_struct_list( дф: DataFrame, групповые_ключи: список, include_columns: список = Нет, имя_структуры: str = "значения_столбца", режим="контекст", ) -> Кадр данных: если не isinstance(grouping_keys, list): поднять ValueError("grouping_keys должен быть списком имен столбцов") если не все (ключ в df.columns для ключа в grouping_keys): поднять ValueError("Все ключи в grouping_keys должны присутствовать в DataFrame") если включенные_столбцы имеют значение Нет: включенные_столбцы = [ имя_столбца для имени_столбца в df.columns, если имя_столбца отсутствует в grouping_keys ] elif не isinstance(included_columns, list) или не все( имя_столбца в df.columns для имени_столбца во включенных_столбцах ): поднять ValueError( «included_columns должен быть списком допустимых имен столбцов из DataFrame» ) если режим == "контекст": struct_cols = F.struct(*included_columns) агрегированный_df = df.groupBy(*grouping_keys).agg( F.collect_list(struct_cols).alias(struct_name) ) если режим == "индивидуальный_набор": агрегаты = [ F.array_compact(F.collect_set(col)).alias(col) для столбца в include_columns ] агрегированный_df = df.groupBy(*grouping_keys).agg(*агрегации) вернуть агрегированный_df Защиту Harmonize_data (column_values): # Инициализируем словарь для хранения уникальных значений для каждого ключа конденсированная_информация = {} для строки в columns_values: # Конвертируем каждую строку в словарь запись = row.asDict() для ключа значение в входе.items(): if value: # Проверьте, не равно ли значение None/Null если ключ в конденсированной_информации: # Если ключ уже существует, добавьте значение в список, если оно уникально если значение отсутствует в конденсированной_информации[ключ]: конденсированная_информация[ключ].append(значение) еще: # Инициализируем новый список для этого ключа конденсированная_информация[ключ] = [значение] # Необязательно: конвертируйте списки с одним значением обратно в это значение. для ключа в конденсированной_информации: если len(confused_info[ключ]) == 1: конденсированная_информация[ключ] = конденсированная_информация[ключ][0] вернуть json.dumps(confused_info) Harmonize_udf = udf(harmonize_data, StringType()) dd = Collect_as_struct_list(sdf, grouping_keys=["bv_d_id_number"], mode="context") result_df = dd.withColumn("harmonized_info", Harmonize_udf(dd["column_values"])) Конкретная версия Spark — 3.5
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»