Я новичок в потоковой передаче. Я работаю над установкой, в которой у меня есть задание AWS Glue, которое извлекает записи из кластера MSK. Это задание Glue запускается группой обеспечения доступности баз данных Airflow каждые 30 минут. Вот рабочий процесс:
Когда группа обеспечения доступности баз данных запускает задание Glue, она извлекает данные из темы Kafka, обрабатывает их, а затем сохраняет результаты в таблице Glue.
На При следующем триггере DAG (30 минут спустя) я хочу обрабатывать и вставлять только новые записи из темы Kafka — по сути, только данные, полученные с момента последнего запуска.
Я разобрался с основами: извлечение данных из Kafka, его обработка и вставка в таблицы Glue. Но я не уверен, как обеспечить обработку только новых записей при последующих запусках.
это упрощенная версия задания склеивания:
Я бы не хотел вносить какие-либо изменения в метод записи. Как я могу использовать для этого контрольно-пропускные пункты в Кафке? Будет ли это считаться пакетным или потоковым заданием? Каков был бы наиболее эффективный способ решить эту проблему, не внося больших изменений в текущую версию кода?
Я новичок в потоковой передаче. Я работаю над установкой, в которой у меня есть задание AWS Glue, которое извлекает записи из кластера MSK. Это задание Glue запускается группой обеспечения доступности баз данных Airflow каждые 30 минут. Вот рабочий процесс: Когда группа обеспечения доступности баз данных запускает задание Glue, она извлекает данные из темы Kafka, обрабатывает их, а затем сохраняет результаты в таблице Glue. На При следующем триггере DAG (30 минут спустя) я хочу обрабатывать и вставлять только новые записи из темы Kafka — по сути, только данные, полученные с момента последнего запуска. Я разобрался с основами: извлечение данных из Kafka, его обработка и вставка в таблицы Glue. Но я не уверен, как обеспечить обработку только новых записей при последующих запусках. это упрощенная версия задания склеивания: [code]spark = ( SparkSession .builder .appName("Streaming from Kafka") .config("spark.streaming.stopGracefullyOnShutdown", True) .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') .config("spark.sql.shuffle.partitions", 4) .getOrCreate() )
# Define the schema for the JSON structure in the value column json_schema = StructType([ StructField("in_s3_bucket", StringType(), True), StructField("in_s3_key", StringType(), True) ])
# Cast the value column to string, then parse the JSON df_joined = event_logs_df \ .select( from_json(col("value").cast("string"), json_schema).alias("parsed_value"), col("timestamp") ) \ .select("parsed_value.in_s3_bucket", "parsed_value.in_s3_key", "timestamp") \ .orderBy(desc("timestamp"))
[/code] Я бы не хотел вносить какие-либо изменения в метод записи. Как я могу использовать для этого контрольно-пропускные пункты в Кафке? Будет ли это считаться пакетным или потоковым заданием? Каков был бы наиболее эффективный способ решить эту проблему, не внося больших изменений в текущую версию кода?
Используя AWS Firehose для загрузки данных в таблицу Iceberg, управляемую AWS Glue, я не могу вставить данные временных меток.
Firehose
Я пытаюсь вставить данные с помощью следующего скрипта:
json_data = json.dumps(
{
ADF_Record : {
foo : bar...
Используя AWS Firehose для загрузки данных в таблицу Iceberg, управляемую AWS Glue, я не могу вставить данные метки времени.
Firehose
Я пытаюсь вставить данные с помощью следующего скрипта:
json_data = json.dumps(
{
ADF_Record : {
foo : bar ,...
Я пытаюсь создать клейкую таблицу с столбцом, который отображает массив структуры с определенной схемой. Используя экспериментальную конструкцию AWS_GLUE_ALPHA , определение массива структуры возможно только с помощью этого синтаксиса кода...
Я пытаюсь создать клейкую таблицу с столбцом, который отображает массив структуры с определенной схемой. Используя экспериментальную конструкцию AWS_GLUE_ALPHA , определение массива структуры возможно только с помощью этого синтаксиса кода...
Я пытаюсь создать клейкую таблицу с столбцом, который отображает массив структуры с определенной схемой. Используя экспериментальную конструкцию AWS_GLUE_ALPHA , определение массива структуры возможно только с помощью этого синтаксиса кода...