AWS Glue и Kafka: как избежать повторной обработки старых записей?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 AWS Glue и Kafka: как избежать повторной обработки старых записей?

Сообщение Anonymous »

Я новичок в потоковой передаче. Я работаю над установкой, в которой у меня есть задание AWS Glue, которое извлекает записи из кластера MSK. Это задание Glue запускается группой обеспечения доступности баз данных Airflow каждые 30 минут. Вот рабочий процесс:
Когда группа обеспечения доступности баз данных запускает задание Glue, она извлекает данные из темы Kafka, обрабатывает их, а затем сохраняет результаты в таблице Glue.
На При следующем триггере DAG (30 минут спустя) я хочу обрабатывать и вставлять только новые записи из темы Kafka — по сути, только данные, полученные с момента последнего запуска.
Я разобрался с основами: извлечение данных из Kafka, его обработка и вставка в таблицы Glue. Но я не уверен, как обеспечить обработку только новых записей при последующих запусках.
это упрощенная версия задания склеивания:

Код: Выделить всё

spark = (
SparkSession
.builder
.appName("Streaming from Kafka")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
.config("spark.sql.shuffle.partitions", 4)
.getOrCreate()
)

event_logs_df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", f"b-xxx.us-west-2.amazonaws.com:9098") \
.option("subscribe", f"controller_config") \
.option("minOffsetsPerTrigger", 1) \
.option("maxOffsetsPerTrigger", 100) \
.option("maxTriggerDelay", "1ms") \
.option("startingOffsets", f"earliest") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "AWS_MSK_IAM") \
.option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;") \
.option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler") \
.load()

# Define the schema for the JSON structure in the value column
json_schema = StructType([
StructField("in_s3_bucket", StringType(), True),
StructField("in_s3_key", StringType(), True)
])

# Cast the value column to string, then parse the JSON
df_joined = event_logs_df \
.select(
from_json(col("value").cast("string"), json_schema).alias("parsed_value"),
col("timestamp")
) \
.select("parsed_value.in_s3_bucket", "parsed_value.in_s3_key", "timestamp") \
.orderBy(desc("timestamp"))

if spark._jsparkSession.catalog().tableExists(
out_glue_db_name.replace("", ""),
out_glue_table_name.replace("", "")
):
df_joined.select("date", "s3key", "cc", "reason", "companyid", "siteid", "satid", "year") \
.write.format("parquet") \
.mode("append") \
.insertInto(f"{out_glue_db_name}.{out_glue_table_name}")
else:
df_joined.select("cid", "siteid", "satid", "year", "date", "s3key", "cc", "reason") \
.write.partitionBy("cid", "siteid", "satid", "year") \
.format("parquet") \
.mode("overwrite") \
.saveAsTable(
name=f"{out_glue_db_name}.{out_glue_table_name}",
path=f"s3://{out_s3_bucket}/{out_glue_table_name}"

Я бы не хотел вносить какие-либо изменения в метод записи. Как я могу использовать для этого контрольно-пропускные пункты в Кафке? Будет ли это считаться пакетным или потоковым заданием? Каков был бы наиболее эффективный способ решить эту проблему, не внося больших изменений в текущую версию кода?

Подробнее здесь: https://stackoverflow.com/questions/791 ... ld-records
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как вставить данные временных меток в таблицу Iceberg, управляемую AWS Glue, с помощью AWS Firehose?
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Как вставить данные временных меток в таблицу Iceberg, управляемую AWS Glue, с помощью AWS Firehose?
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous
  • AWS Glue: не может определить массив структуры с AWS CDK
    Anonymous » » в форуме Python
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • AWS Glue: не может определить массив структуры с AWS CDK
    Anonymous » » в форуме Python
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • AWS Glue: не может определить массив структуры с AWS CDK
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»