Я использую потоковую потоковую передачу Apache Spark для обработки входящих данных о событиях Debezium в партиях каждые 1 час (время расписания). Текущая реализация работает, но она кажется медленной и не удобной для производительности. < /P>
def process_batch(batch_df, batch_id, key_column_name='id'):
global cached_schema, cached_field_info, select_cols, ordered_fields, future_data, is_halfway, existing_data
if batch_df.isEmpty():
return
if is_cached_schema() and cached_schema is None:
cached_schema, cached_field_info = load_cached_schema()
if not cached_schema:
data_json = batch_df.first()["value"]
cached_schema, _, cached_field_info = create_dynamic_schema(data_json)
save_cached_schema(cached_schema, cached_field_info)
parsed_batch = batch_df.select(from_json(col("value"), cached_schema).alias("data"))
if not select_cols or not ordered_fields:
select_cols, ordered_fields = generate_select_statements(cached_schema, cached_field_info)
if key_column_name not in ordered_fields:
raise ValueError(f"Key column '{key_column_name}' not found in schema fields: {ordered_fields}")
parsed_data = parsed_batch.select(select_cols)
sorted_events = parsed_data.orderBy("timestamp")
for event in sorted_events.collect():
operation = event["operation"]
try:
existing_data = spark.read.format("delta").load(minio_output_path)
delta_table = DeltaTable.forPath(spark, minio_output_path)
except AnalysisException:
if operation == "c":
initial_insert_operation_processing(event, ordered_fields, key_column_name)
continue
if operation == "c":
insert_operation_processing(event, ordered_fields, delta_table, key_column_name)
elif operation == "u":
update_operation_processing(event, ordered_fields, delta_table, key_column_name)
elif operation == "d":
delete_operation_processing(event, ordered_fields, delta_table, key_column_name)
print(f"Data count {existing_data.count()}")
def initial_insert_operation_processing(event, fields_ordered, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
row_data["timestamp"] = event["timestamp"]
insert_df = spark.createDataFrame([Row(**row_data)])
print('Inserting record:', row_data[key_column_name])
insert_df.write.format("delta").mode("append").save(minio_output_path)
def insert_operation_processing(event, fields_ordered, delta_table, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
row_data["timestamp"] = event["timestamp"]
insert_df = spark.createDataFrame([Row(**row_data)])
print('Inserting record:', row_data[key_column_name])
delta_table.alias("target").merge(
insert_df.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenNotMatchedInsertAll().execute()
def update_operation_processing(event, fields_ordered, delta_table, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
update_df = spark.createDataFrame([Row(**row_data)])
print('Updating record:', row_data[key_column_name])
delta_table.alias("target").merge(
update_df.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenMatchedUpdate(
condition=None,
set={field: f"source.{field}" for field in fields_ordered}
).execute()
def delete_operation_processing(event, fields_ordered, delta_table, key_column_name):
delete_data = spark.createDataFrame([Row(**{key_column_name: event[f'before_{key_column_name}']})])
print('Deleting record:', event[f'before_{key_column_name}'])
delta_table.alias("target").merge(
delete_data.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenMatchedDelete().execute()
< /code>
потоковой запрос: < /p>
query = df.writeStream \
.foreachBatch(
lambda dataframe, b_id: process_batch(dataframe, id, key_column_name="customerId")) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime=time_process) \
.start()
< /code>
Как я могу оптимизировать этот процесс? Если событие в новой партии зависит от события из предыдущей партии, оно теряется, что приводит к пропущенным данным. Я хотел бы оптимизировать его и следовать передовым методам для эффективного захвата всех изменений из источника и хранения их в таблице дельты в хранении объектов S3 для исторического анализа.
Подробнее здесь: https://stackoverflow.com/questions/794 ... ent-change
Как оптимизировать потоковую передачу Spark Structured с помощью Delta Table для эффективного сбора данных? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение