Оптимизация кода для сведения данных о показателях метарекламы в SparkPython

Программы на Python
Ответить
Anonymous
 Оптимизация кода для сведения данных о показателях метарекламы в Spark

Сообщение Anonymous »

У меня есть два искровых скрипта: первый, поскольку бронзовый скрипт должен формировать данные из тем Kafka, каждая тема содержит данные рекламной платформы (tiktok_insights, Meta_insights, google_insights). Структура такая же,

Код: Выделить всё

( id, campaign_id, platform, duration_type ENUM[daily,monthly], breakdown_type ENUM[None,age,gender,country], insights: JSON )
Поле статистики содержит столбцы показателей, такие как клики, показы, а также разбивочное значение, например страна: «США». Теперь с обычными метриками все в порядке, но в Meta есть некоторые сложные типы столбцов, например, действия с данными типа

Код: Выделить всё

[{activity_type: 'like', value: '1'}] or

[{activity_type: 'offsite_conversion.fb_pixel_add_payment_info', value: '1'}],
Другими являются Cost_per_action_type, unique_actions, Cost_per_unique_action_type, action_values, конверсии_значения, конверсии.
Мне нужно сгладить это, я могу предоставить некоторую статическую карту, например

Код: Выделить всё

{
like: { actions: 'like', cost_per_action_type: 'cost_per_like' },
'offsite_conversion.fb_pixel_add_payment_info': { actions:   'website_add_payment_info', cost_per_action_type: 'cost_per_website_add_payment_info' },
'app_custom_event.fb_mobile_add_payment_info': { actions: 'in_app_add_payment_info', cost_per_action_type: 'cost_per_in_app_add_payment_info' }
}
У меня есть следующий код, но он не работает с картой, а просто выравнивает шаблон.

Код: Выделить всё

META_METRIC_FIELDS = {
"actions": "",
"unique_actions": "unique_",
"cost_per_action_type": "cost_per_",
"cost_per_unique_action_type": "unique_cost_per_",
"outbound_clicks": "",
"outbound_clicks_ctr": "",
"cost_per_outbound_click": "",
"cost_per_unique_outbound_click": "",
"video_time_watched_actions": "",
"video_play_actions": "",
"video_p25_watched_actions": "",

"video_p50_watched_actions": "",
"video_p75_watched_actions": "",
"video_p95_watched_actions": "",
"video_p100_watched_actions": "",

"cost_per_2_sec_continuous_video_view": "",
"cost_per_15_sec_video_view": "",
"video_avg_time_watched_actions": "",
"cost_per_30_sec_video_view": ""

}

def flatten_meta_array_metrics(df):
df.show(1)
insights_col = "insights"
for field, prefix in META_METRIC_FIELDS.items():
# Get unique action_types present in the field across the dataset (optional optimization)
# Otherwise, use a fixed list or schema metadata
full_field = f"{insights_col}.{field}"
if field not in df.schema[insights_col].dataType.names:
continue  # Skip if not present in schema

flat_map_col = f"{field}_flat_map"

# action_type_col = f"{insights_col}.{field}"
df = df.withColumn(
flat_map_col,
F.when(
F.col(full_field).isNotNull(),
F.expr(f"""
map_from_entries(
filter(transform({full_field}, x -> struct(
CASE WHEN x.action_type IS NOT NULL THEN '{prefix}' || x.action_type ELSE NULL END as key,
x.value as value
)), x -> x.key IS NOT NULL)
)
""")
)
)
# )

keys = (
df.select(flat_map_col)
.rdd.flatMap(lambda row: list(row[0].keys()) if row[0] else [])
.distinct()
.collect()
)

key_map = {key: key.replace(".", "_") for key in keys}

# Add new fields into insights struct
for raw_key, safe_key in key_map.items():
df = df.withColumn(
insights_col,
F.col(insights_col).withField(safe_key, F.col(flat_map_col).getItem(raw_key))
)

# Drop original complex array field and the map
df = df.withColumn(insights_col, F.col(insights_col).dropFields(field))
df = df.drop(flat_map_col)
df.show(1)
return df
Поэтому генерируйте данные типа

Код: Выделить всё

{'like': 1, 'offsite_conversion.fb_pixel_add_payment_info': 1, app_custom_event.fb_mobile_add_payment_info: 1}
Какие данные я предпочитаю

Код: Выделить всё

{'like': 1, 'website_add_payment_info': 1, in_app_add_payment_info: 1}
Кроме того, код слишком тяжелый и занимает 10-20 минут в локальной системе даже с десятью строками тестовых данных. Для тысяч строк это может занять час даже на сервере

Подробнее здесь: https://stackoverflow.com/questions/798 ... a-in-spark
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»