if (error != null) {
LOG.error("Failed to enumerate files", error);
return;
}
final Collection newSplits =
splits.stream()
.filter((split) -> pathsAlreadyProcessed.add(split.path()))
.collect(Collectors.toList());
splitAssigner.addSplits(newSplits);
assignSplits();
}
< /code>
Класс жестко кодируется в Abstractfilesource в .createspliteNumerator (), поэтому я не могу его заменить. < /p>
В то же время официальная документация гласит, что При сканировании каталога для обнаружения новых файлов последнее время изменения также принимается на сравнение. В классе Filesourcesplit есть эти данные, но он не используется в разделенном перечислении.public class FileSourceSplit implements SourceSplit, Serializable {
/** The modification time of the file, from {@link FileStatus#getModificationTime()}. */
private final long fileModificationTime;
}
< /code>
Вопросы: < /p>
- Как я могу добавить поле FileModificationtime в эту проверку или, возможно, существуют реализации? < / li>
Если пути к файлам постоянно добавляются в PathsalreadyPrococed, тогда, когда этот список очищается? Это важно, потому что он идет на контрольно -пропускные пункты и может занять много памяти.
Подробнее здесь: https://stackoverflow.com/questions/794 ... -same-file