Код: Выделить всё
final List errorPaths = new ArrayList ();//Deadletter queue
final Dataset exclusionContent = sparkSession.read ().parquet ("s3://some/path/having/eclusion/content");
//hundredsOfPaths is list of all the paths that need to be read, exclude some data and rewrite
hundredsOfPaths.stream ().parallel ().forEach (pathInContext -> {
try {
final Dataset originalContent = sparkSession.read ().parquet (pathInContext).cache ();
final long countOriginal = originalContent.count ();
final Dataset finalContent = originalContent.except (exclusionContent).cache ();
final long finalCount = finalContent.count ();
finalContent.repartition (1)
.write ()
.format ("parquet")
.mode (SaveMode.Overwrite)
.save (pathInContext);
}
catch (Throwable e) {
//If there is any error in any of the paths interaction, then reprocess
errorPaths.add (pathInContext);
}
});
//Reprocess all paths accumulated in errorPaths
Код: Выделить всё
paths.stream ().parallel ().forEach...Подробнее здесь: https://stackoverflow.com/questions/783 ... rk-in-java
Мобильная версия