Spark получает SparkException: при анализе записей обнаружены некорректные записиJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Spark получает SparkException: при анализе записей обнаружены некорректные записи

Сообщение Anonymous »

Я пытаюсь отобразить данные Avro из Kafka на консоли с помощью DataStream и поймать исключение.
Причина: org.apache.spark.SparkException: при анализе записей обнаружены некорректные записи. Текущий режим анализа: FAILFAST. Чтобы обрабатывать некорректные записи как нулевые, попробуйте установить для параметра «mode» значение «PERMISSIVE».```
Как это исправить? моя конфигурация
public static void main(String[] args) throws StreamingQueryException, IOException, TimeoutException, RestClientException {

Map kafkaParams = new HashMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", AvroDeserializer.class);
kafkaParams.put("value.deserializer", AvroDeserializer.class);
kafkaParams.put("group.id", "sparkGroupId");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);

SparkSession spark = SparkSession.builder().appName("Hello Spark").master(
"local").getOrCreate();
System.out.println("Hello, Spark v." + spark.version());
spark.sparkContext().setLogLevel("ERROR");

Subscribe to 1 topic

Dataset df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "j_client")
.option("startingOffsets", "earliest")
.load();

String schemaRegistryAddr = "http://localhost:8081";

создать объект Rest Service var restService = new RestService(schemaRegistryAddr);

// Create REST service to access schema registry and retrieve topic schema (latest)
var valueRestResponseSchema = restService.getLatestVersion("j_client" + "-value");
var jsonSchema = valueRestResponseSchema.getSchema();

var jClientDF =df.select(
col("key").cast("string"), // cast to string from binary value
from_avro(col("value"), jsonSchema).as("j_client"), // convert from avro value
col("topic"),
col("offset"),
col("timestamp"),
col("timestampType"))
;

jClientDF.printSchema();

Stream data to console for testing

jClientDF
.writeStream()
.format("console")
.start()
.awaitTermination()
;



Подробнее здесь: https://stackoverflow.com/questions/784 ... rd-parsing
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»