Я использую бессерверную обработку данных с API Java для чтения темы Kafka. В теме всего 2 раздела.
Тема получает 200 сообщений в секунду.
После чтения сообщений я перераспределяю данные на 100, преобразовываю данные, а затем записываю в BQ.
После перераспределения я вижу два этапа: один с 2 задачами для чтения данных, а второй с 10 задачами, который преобразует и записывает данные.
При наличии
streaming.max.offsets.per.trigger=500
streaming.min.offsets.per.trigger=100
Задача чтения сильно варьируется во времени от 7 секунд до 1 минуты 50 минут.
В то время как задача преобразования данных записи занимает около 10 секунд.
Любая идея о том, почему чтение данных занимает слишком много времени и как оптимизировать код ?
Я использую бессерверную обработку данных с API Java для чтения темы Kafka. В теме всего 2 раздела. Тема получает 200 сообщений в секунду. После чтения сообщений я перераспределяю данные на 100, преобразовываю данные, а затем записываю в BQ. После перераспределения я вижу два этапа: один с 2 задачами для чтения данных, а второй с 10 задачами, который преобразует и записывает данные. При наличии streaming.max.offsets.per.trigger=500 streaming.min.offsets.per.trigger=100 Задача чтения сильно варьируется во времени от 7 секунд до 1 минуты 50 минут. В то время как задача преобразования данных записи занимает около 10 секунд. Любая идея о том, почему чтение данных занимает слишком много времени и как оптимизировать код ? [code]Dataset dfr = spark .readStream() .format("org.apache.spark.sql.kafka010.KafkaSourceProvider") .option("kafka.bootstrap.servers", kafkaServers) .option("kafka.sasl.kerberos.service.name", "kafka") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.security.protocol", "SASL_SSL") .option("kafka.ssl.truststore.location", trustStoreName) .option("kafka.ssl.truststore.password", truststorePassword) .option("kafka.ssl.truststore.type", "JKS") .option("startingOffsets", "latest") .option("kafka.max.partition.fetch.bytes", "209715200") // 200MB per partition .option("kafka.fetch.max.bytes", "1048576000") // 1000MB total .option("subscribe", kafkaTopic) .option("maxOffsetsPerTrigger", maxOffsets) .option("minOffsetsPerTrigger", minOffsets) .option("failOnDataLoss", "false") .option("kafka.request.timeout.ms", 300000) .option("kafka.session.timeout.ms", 60000) .load();
Dataset dfr2 = dfr.selectExpr( "CAST(topic as STRING) as topic", "CAST(key AS STRING) AS key", "CAST(value AS STRING) AS xml", "timestamp", "partition", "offset").repartition(10);