Можно ли написать данные от исполнителей Spark в Java Spark?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Можно ли написать данные от исполнителей Spark в Java Spark?

Сообщение Anonymous »

У меня есть приложение Java Spark, которое получает данные от Kafka, выполняет некоторую работу по указанным данным, а затем сохраняет паркетные файлы в S3, используя команду spark .write () . До этого момента мое приложение сохраняло все полученные данные в драйвере Spark, а затем сохранило бы данные, используя текущий Spark Session. Что работает нормально.
Упрощенный, общий код того, что у меня сейчас есть, так:
Основной класс

Код: Выделить всё

public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
класс обработки

Код: Выделить всё

private DummyClass dummyClass;

// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
класс Dummyclass

Код: Выделить всё

private SparkUtility sparkUtility;

// constructor here

public void createParquet(String myField) {
List rowVals = new ArrayList();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}

private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
Class Class

Код: Выделить всё

private SparkSession session;

// constructor here

private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}

public void writeParquet(List entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
< /code>
Это работает, и это нормально. Тем не менее, теперь я хочу внести изменения в класс обработки 
как так:
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
< /code>
Остальная часть кода сейчас не изменилась. Код выполняется нормально, но не может сохранять данные и бросает следующее исключение: < /p>
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
< /code>
Из того, что я понимаю, это потому, что я пытаюсь использовать Spark Session у исполнителей. Итак, как я могу преобразовать набор данных в Parquet и сохранить в S3? Если есть способ получить доступ к сеансу и сказать ему сохранить данные с помощью. И различные попытки получить сеанс приводит к той же ошибке.

Подробнее здесь: https://stackoverflow.com/questions/795 ... java-spark
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»