Как исправить org.apache.spark.SparkException: задание прервано из-за сбоя этапа Task & com.datastax.spark.connector.rddJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как исправить org.apache.spark.SparkException: задание прервано из-за сбоя этапа Task & com.datastax.spark.connector.rdd

Сообщение Anonymous »

В моем проекте я использую Spark-Cassandra-Connector для чтения таблицы из Cassandra и дальнейшей обработки ее в JavaRDD, но я столкнулся с проблемой при обработке строки Cassandra в javaRDD.

Код: Выделить всё

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 52, 172.20.0.4, executor 1):
java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:370)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я настроил свою искру для использования искрового кластера. Когда я использую мастер как локальный, код работает нормально, но как только я заменяю его мастером, у меня возникает проблема.
Вот моя конфигурация искры:

Код: Выделить всё

SparkConf sparkConf = new SparkConf().setAppName("Data Transformation")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("spark://masterip:7077");

sparkConf.set("spark.cassandra.connection.host", cassandraContactPoints);
sparkConf.set("spark.cassandra.connection.port", cassandraPort);
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.driver.allowMultipleContexts", "true");

/*
* sparkConf.set("spark.cassandra.auth.username", "centralrw");
* sparkConf.set("spark.cassandra.auth.password", "t8b9HRWy");
*/
logger.info("creating spark context object");
sparkContext = new JavaSparkContext(sparkConf);
logger.info("returning sparkcontext object");
return sparkContext;
Версия Spark — 2.4.0
spark-Cassandra_connector — 2.4.0

ReceiverConfig:< /p>

Код: Выделить всё

public List readDataFromGenericTriggerEntityUsingSpark(
JavaSparkContext sparkContext) {

List genericTriggerEntityList = new ArrayList();

try {

logger.info("Keyspace & table name to read data from cassandra");
String tableName = "generictriggerentity";
String keySpace = "centraldatalake";

logger.info("establishing conection");
CassandraJavaRDD cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext)
.cassandraTable(keySpace, tableName);
int num = cassandraRDD.getNumPartitions();
System.out.println("num- " + num);

logger.info("Converting extracted rows to JavaRDD");
JavaRDD rdd = cassandraRDD
.map(new Function() {

private static final long serialVersionUID = -165799649937652815L;

@Override
public Map call(CassandraRow row) throws Exception {
Map  genericTriggerEntityMap = new HashMap();
GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
if (row.getString("end") != null)
genericTriggerEntity.setEnd(row.getString("end"));
if (row.getString("key") != null)
genericTriggerEntity.setKey(row.getString("key"));
if (row.getString("keyspacename") != null)
genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
if (row.getString("partitiondeleted") != null)
genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
if (row.getString("rowdeleted") != null)
genericTriggerEntity.setRowdeleted(row.getString("rowdeleted"));
if (row.getString("rows") != null)
genericTriggerEntity.setRows(row.getString("rows"));
if (row.getString("start") != null)
genericTriggerEntity.setStart(row.getString("start"));
if (row.getString("tablename") != null) {
genericTriggerEntity.setTablename(row.getString("tablename"));
dataTableName = row.getString("tablename");
}
if (row.getString("triggerdate") != null)
genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
if (row.getString("triggertime") != null)
genericTriggerEntity.setTriggertime(row.getString("triggertime"));
if (row.getString("uuid") != null)
genericTriggerEntity.setUuid(row.getUUID("uuid"));

genericTriggerEntityMap.put(dataTableName, genericTriggerEntity);
return genericTriggerEntityMap;
}

});

List
 partition = rdd.partitions();
System.out.println("partion - " + partition.size());

logger.info("Collecting data into rdd");
genericTriggerEntityList = rdd.collect();

} catch (Exception e) {
e.printStackTrace();
}
logger.info("returning generic trigger entity list");
return genericTriggerEntityList;
}
когда я выполняю rdd.collect(), возникает исключение

Код: Выделить всё

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 21, 10.22.3.55, executor 0): java.lang.ClassNotFoundException: in.dmart.central.data.transform.base.config.ReceiverConfig$1
Я нашел решение: создать толстую банку и включить ее в свой код, но я не хочу этого делать, поскольку каждый раз, когда я вношу какие-либо изменения, мне придется делать обработать снова, и это невозможно.

Пожалуйста, предложите какое-нибудь решение для настройки в коде или искровом кластере.
Заранее спасибо.

Подробнее здесь: https://stackoverflow.com/questions/554 ... ailure-tas
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»