Как оптимизировать потоковую передачу Spark Structured с помощью Delta Table для эффективного сбора данных?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как оптимизировать потоковую передачу Spark Structured с помощью Delta Table для эффективного сбора данных?

Сообщение Anonymous »

Я использую потоковую потоковую передачу Apache Spark для обработки входящих данных о событиях Debezium в партиях каждые 1 час (время расписания). Текущая реализация работает, но она кажется медленной и не удобной для производительности. < /P>
def process_batch(batch_df, batch_id, key_column_name='id'):
global cached_schema, cached_field_info, select_cols, ordered_fields, future_data, is_halfway, existing_data

if batch_df.isEmpty():
return

if is_cached_schema() and cached_schema is None:
cached_schema, cached_field_info = load_cached_schema()

if not cached_schema:
data_json = batch_df.first()["value"]
cached_schema, _, cached_field_info = create_dynamic_schema(data_json)
save_cached_schema(cached_schema, cached_field_info)

parsed_batch = batch_df.select(from_json(col("value"), cached_schema).alias("data"))
if not select_cols or not ordered_fields:
select_cols, ordered_fields = generate_select_statements(cached_schema, cached_field_info)

if key_column_name not in ordered_fields:
raise ValueError(f"Key column '{key_column_name}' not found in schema fields: {ordered_fields}")

parsed_data = parsed_batch.select(select_cols)
sorted_events = parsed_data.orderBy("timestamp")

for event in sorted_events.collect():
operation = event["operation"]
try:
existing_data = spark.read.format("delta").load(minio_output_path)
delta_table = DeltaTable.forPath(spark, minio_output_path)
except AnalysisException:
if operation == "c":
initial_insert_operation_processing(event, ordered_fields, key_column_name)
continue

if operation == "c":
insert_operation_processing(event, ordered_fields, delta_table, key_column_name)
elif operation == "u":
update_operation_processing(event, ordered_fields, delta_table, key_column_name)
elif operation == "d":
delete_operation_processing(event, ordered_fields, delta_table, key_column_name)

print(f"Data count {existing_data.count()}")

def initial_insert_operation_processing(event, fields_ordered, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
row_data["timestamp"] = event["timestamp"]
insert_df = spark.createDataFrame([Row(**row_data)])

print('Inserting record:', row_data[key_column_name])
insert_df.write.format("delta").mode("append").save(minio_output_path)

def insert_operation_processing(event, fields_ordered, delta_table, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
row_data["timestamp"] = event["timestamp"]
insert_df = spark.createDataFrame([Row(**row_data)])

print('Inserting record:', row_data[key_column_name])
delta_table.alias("target").merge(
insert_df.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenNotMatchedInsertAll().execute()

def update_operation_processing(event, fields_ordered, delta_table, key_column_name):
row_data = {field: event[f"after_{field}"] for field in fields_ordered}
update_df = spark.createDataFrame([Row(**row_data)])

print('Updating record:', row_data[key_column_name])

delta_table.alias("target").merge(
update_df.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenMatchedUpdate(
condition=None,
set={field: f"source.{field}" for field in fields_ordered}
).execute()

def delete_operation_processing(event, fields_ordered, delta_table, key_column_name):
delete_data = spark.createDataFrame([Row(**{key_column_name: event[f'before_{key_column_name}']})])

print('Deleting record:', event[f'before_{key_column_name}'])
delta_table.alias("target").merge(
delete_data.alias("source"),
f"target.{key_column_name} = source.{key_column_name}"
).whenMatchedDelete().execute()
< /code>
потоковой запрос: < /p>
query = df.writeStream \
.foreachBatch(
lambda dataframe, b_id: process_batch(dataframe, id, key_column_name="customerId")) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime=time_process) \
.start()

< /code>
Как я могу оптимизировать этот процесс? Если событие в новой партии зависит от события из предыдущей партии, оно теряется, что приводит к пропущенным данным. Я хотел бы оптимизировать его и следовать передовым методам для эффективного захвата всех изменений из источника и хранения их в таблице дельты в хранении объектов S3 для исторического анализа.

Подробнее здесь: https://stackoverflow.com/questions/794 ... ent-change
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Написание в Delta Table с использованием Spark SQL
    Anonymous » » в форуме Python
    0 Ответы
    1 Просмотры
    Последнее сообщение Anonymous
  • Невозможно выполнить потоковую передачу данных из одной базы данных MySQL в другую через Apache Flink (java.io.StreamCor
    Anonymous » » в форуме JAVA
    0 Ответы
    25 Просмотры
    Последнее сообщение Anonymous
  • Spark Structured Streating MySQL Читать [дублировать]
    Anonymous » » в форуме JAVA
    0 Ответы
    1 Просмотры
    Последнее сообщение Anonymous
  • Spark Structured Streating MySQL Читать [дублировать]
    Anonymous » » в форуме MySql
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous
  • Добавление к Delta Tables в ткани, используя Spark vs без Spark
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»