Flink непрерывный источник файла с периодическим и тем же файломJAVA

Программисты JAVA общаются здесь
Anonymous
Flink непрерывный источник файла с периодическим и тем же файлом

Сообщение Anonymous »

У меня есть внешняя система, которая записывает файл с одинаковым именем каждый час. Я прочитал этот каталог с Flink 1.18.1 с удалением после чтения, но после первой обработки файл не читается. Я обнаружил, что это происходит в классе непрерывнойфильсплеточкового класса, когда новый файл Filesourcesplit отфильтрован путем, потому что PathSalreadyProcasted содержит тот же путь файла из ранее чтения и удаленного файла. > private void processDiscoveredSplits(Collection splits, Throwable error) {
if (error != null) {
LOG.error("Failed to enumerate files", error);
return;
}

final Collection newSplits =
splits.stream()
.filter((split) -> pathsAlreadyProcessed.add(split.path()))
.collect(Collectors.toList());
splitAssigner.addSplits(newSplits);

assignSplits();
}

< /code>
Класс жестко кодируется в Abstractfilesource в .createspliteNumerator (), поэтому я не могу его заменить. < /p>
В то же время официальная документация гласит, что При сканировании каталога для обнаружения новых файлов последнее время изменения также принимается на сравнение. В классе Filesourcesplit есть эти данные, но он не используется в разделенном перечислении.public class FileSourceSplit implements SourceSplit, Serializable {
/** The modification time of the file, from {@link FileStatus#getModificationTime()}. */
private final long fileModificationTime;
}

< /code>
Вопросы: < /p>
  • Как я могу добавить поле FileModificationtime в эту проверку или, возможно, существуют реализации? < / li>
    Если пути к файлам постоянно добавляются в PathsalreadyPrococed, тогда, когда этот список очищается? Это важно, потому что он идет на контрольно -пропускные пункты и может занять много памяти.


Подробнее здесь: https://stackoverflow.com/questions/794 ... -same-file

Вернуться в «JAVA»