Бессерверная обработка данных, медленное использование темы KafkaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Бессерверная обработка данных, медленное использование темы Kafka

Сообщение Anonymous »

Я использую бессерверную обработку данных с API Java для чтения темы Kafka. В теме всего 2 раздела.
Тема получает 80 сообщений в секунду.
После чтения сообщений я перераспределяю данные на 100, преобразую данные, а затем записываю в BQ.
После перераспределения я вижу два этапа: один с 2 задачами для чтения данных и второй с 10 задачами, который преобразует и записывает данные.
При наличии
streaming.max.offsets.per.trigger=500
streaming.min.offsets.per.trigger=100
Задача чтения сильно варьируется во времени от 7 секунд до 1 минуты 50 минут.
А задача, которая преобразует данные dwrites, занимает около 10 секунд.
Есть идеи о том, почему чтение данных занимает слишком много времени и как оптимизировать код?

Код: Выделить всё

Dataset dfr = spark
.readStream()
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", kafkaServers)
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", trustStoreName)
.option("kafka.ssl.truststore.password", truststorePassword)
.option("kafka.ssl.truststore.type", "JKS")
.option("startingOffsets", "latest")
.option("kafka.max.partition.fetch.bytes", "209715200")  // 200MB per partition
.option("kafka.fetch.max.bytes", "1048576000")            // 1000MB total
.option("subscribe", kafkaTopic)
.option("maxOffsetsPerTrigger", maxOffsets)
.option("minOffsetsPerTrigger", minOffsets)
.option("failOnDataLoss", "false")
.option("kafka.request.timeout.ms", 300000)
.option("kafka.session.timeout.ms", 60000)
.load();

Dataset dfr2 = dfr.selectExpr(
"CAST(topic as STRING) as topic", "CAST(key AS STRING) AS key",
"CAST(value AS STRING) AS xml",
"timestamp", "partition", "offset").repartition(10);

StructType outSchema = new StructType()
.add("key", DataTypes.StringType)
.add("topic", DataTypes.StringType)
.add("partition", DataTypes.IntegerType)
.add("offset", DataTypes.LongType)
.add("JSON_COL", DataTypes.StringType)
.add("DAT_MAJ_DWH", DataTypes.StringType);

// Create proper encoder - cast the result to Encoder
Encoder encoder = Encoders.row(outSchema);

Dataset jsonified = dfr2.mapPartitions(
(MapPartitionsFunction) (Iterator it) -> {
List out = new ArrayList();
DocumentBuilder builder = XML_BUILDER.get();
while (it.hasNext()) {
Row r = it.next();
String topic = r.getString(0);
String key = r.getString(1);
String xml = r.getString(2);
int part = r.getInt(4);
long offset = r.getLong(5);
String json = null;
try {
builder.reset();
// Pass the XML string, not the Document
json = BusMessageXmlJson.toJson(xml);
} catch (Exception ex) {
System.err.println("XML parse error partition=" + part +
" offset=" + offset + " msg="  + ex.getMessage());
}
String ts = r.getTimestamp(3).toInstant().toString();
out.add(RowFactory.create(key, topic, part, offset, json, ts));
}
return out.iterator();
},
encoder
).withColumn(
"DAT_MAJ_DWH",
date_format(to_timestamp(col("DAT_MAJ_DWH")), "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
).select("key","topic","partition","offset","JSON_COL","DAT_MAJ_DWH");

StreamingQuery query = jsonified
.writeStream()
.queryName("spark-sdh-ndc-streaming-query")
.foreachBatch((batchDF, batchId) -> {

// Write this batch to BigQuery using batch API
batchDF.select("JSON_COL", "DAT_MAJ_DWH").write()
.format("bigquery")
.option("temporaryGcsBucket", tempBucket)
.option("table", bigQueryTable)
.option("createDisposition", "CREATE_IF_NEEDED")
.option("intermediateFormat", "avro")
.option("writeMethod", "indirect")
.option("allowFieldAddition", "true")
.option("allowFieldRelaxation", "true")
.mode(SaveMode.Append)
.save();

// 2. Commit offsets to Kafka AFTER successful write
commitOffsetsToKafka(batchDF, kafkaServers, trustStoreName, truststorePassword, consumerGroup);

System.out.println("Batch " + batchId + " written successfully");
})
.option("checkpointLocation", checkpointPath)
.trigger(Trigger.ProcessingTime(triggerInterval))
.start();
System.out.println("Streaming query started successfully!");
System.out.println("Query ID: " + query.id());
System.out.println("Waiting for termination... (Ctrl+C to stop)");

query.awaitTermination();
Изображение


Подробнее здесь: https://stackoverflow.com/questions/797 ... afka-topic
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»