Упрощенный, общий код того, что у меня сейчас есть, так:
Основной класс
Код: Выделить всё
public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
Код: Выделить всё
private DummyClass dummyClass;
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
Код: Выделить всё
private SparkUtility sparkUtility;
// constructor here
public void createParquet(String myField) {
List rowVals = new ArrayList();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}
private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
Код: Выделить всё
private SparkSession session;
// constructor here
private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}
public void writeParquet(List entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
< /code>
Это работает, и это нормально. Тем не менее, теперь я хочу внести изменения в класс обработки
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
< /code>
Остальная часть кода сейчас не изменилась. Код выполняется нормально, но не может сохранять данные и бросает следующее исключение: < /p>
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
< /code>
Из того, что я понимаю, это потому, что я пытаюсь использовать Spark Session у исполнителей. Итак, как я могу преобразовать набор данных в Parquet и сохранить в S3? Если есть способ получить доступ к сеансу и сказать ему сохранить данные с помощью. И различные попытки получить сеанс приводит к той же ошибке.
Подробнее здесь: https://stackoverflow.com/questions/795 ... java-spark