Код: Выделить всё
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
< /code>
Проблема в том, что я не знаю, что указывать в поле «RemotEderectory» (они используют ». Начните, если я не использую это поле. При использовании rowatingserveradvice я предоставляю удаленные каталоги таким образом: < /p>
@Bean
public RotatingServerAdvice advice() {
List keyDirectories = new ArrayList();
for (String remoteDir: properties.getRemoteDirectory()) {
keyDirectories.add(new RotationPolicy.KeyDirectory(SESSION_1_KEY, remoteDir));
}
return new RotatingServerAdvice(sftpSessionFactory(), keyDirectories);
}
Полный код моей конфигурации SFTP ниже: < /p>
Код: Выделить всё
@Configuration
@RequiredArgsConstructor
public class SFTPConfig {
private static final String SESSION_1_KEY = "SESSION_1";
private final SFTPProperties properties;
private final TimeService timeService;
@Value("${cron.parse}")
private String cronParse;
@Bean
public DelegatingSessionFactory sftpSessionFactory() {
DefaultSftpSessionFactory defaultSftpSessionFactory = new DefaultSftpSessionFactory();
defaultSftpSessionFactory.setHost(properties.getHost());
defaultSftpSessionFactory.setPort(properties.getPort());
defaultSftpSessionFactory.setUser(properties.getUsername());
defaultSftpSessionFactory.setPassword(properties.getPassword());
defaultSftpSessionFactory.setAllowUnknownKeys(true);
CachingSessionFactory cachingSessionFactory =
new CachingSessionFactory(defaultSftpSessionFactory);
cachingSessionFactory.setPoolSize(properties.getPoolSize());
cachingSessionFactory.setSessionWaitTimeout(properties.getTimeout());
Map sessionMap = new HashMap();
sessionMap.put(SESSION_1_KEY, cachingSessionFactory);
return new DelegatingSessionFactory(sessionMap, cachingSessionFactory);
}
@Bean
public PollableChannel sftpChannel() {
return new QueueChannel();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
@Bean
public MessageHandler messageHandler(MyService myService, DirectoriesProperties directoriesProperties,
FileService fileService, IHub hub) {
return new MyMessageHandler(myService, directoriesProperties, fileService, hub);
}
@Bean
public IntegrationFlow integrationFlow(MyService myService, DirectoriesProperties directoriesProperties,FileService fileService, IHub hub) {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory())
.filter(filter())
.localDirectory(new File(properties.getLocalDirectory()))
.autoCreateLocalDirectory(true)
.deleteRemoteFiles(true)
.localFilter(new AcceptOnceFileListFilter())
.remoteDirectory("."),
e -> e.poller(Pollers.cron(cronParse).advice(advice())))
.channel(sftpChannel())
.handle(messageHandler(myService, directoriesProperties, fileService, hub))
.get();
}
@Bean
public RotatingServerAdvice advice() {
List keyDirectories = new ArrayList();
for (String remoteDir: properties.getRemoteDirectory()) {
keyDirectories.add(new RotationPolicy.KeyDirectory(SESSION_1_KEY, remoteDir));
}
return new RotatingServerAdvice(sftpSessionFactory(), keyDirectories);
}
private ChainFileListFilter filter() {
ChainFileListFilter filter = new ChainFileListFilter();
filter.addFilter(new SftpRegexPatternFileListFilter("files_\\d{12}.txt"));
filter.addFilter(new TimestampPeriodFileListFilter(timeService, properties.getPeriod()));
return filter;
}
}
< /code>
Я потратил несколько дней, пытаясь сделать Delegatingsessession работать. Раньше это было просто CachingSessionFactory с одним удаленным каталогом для использования. Пожалуйста, помогите мне понять, что я имею в виду с .Remotedirectory (".") Обновление
artem, спасибо за ваше объяснение. Я думал, что моя проблема была с .Remotedirectory (".") , но теперь у меня нет предположения, почему мой интеграционный тест не проходит ... вот код:
Код: Выделить всё
public class ParsingTest extends BaseTest {
@Autowired
private MessageHandler messageHandler;
@Autowired
private MessageSource messageSource;
@Test
void shouldSaveAllCalls() {
String filename = "files_010120230300.txt";
String resourcePath = "/files/good/" + filename;
prepareFileInRemoteSftpServer(resourcePath, filename);
receiveAndHandleFileFromSftp();
}
private void prepareFileInRemoteSftpServer(String resourcePath, String remoteFilename) {
MountableFile mountableFile = MountableFile.forClasspathResource(resourcePath);
sftpContainer.copyFileToContainer(mountableFile, sftpRemotePath + remoteFilename);
}
private void receiveAndHandleFileFromSftp() {
Message message = messageSource.receive();
assertThat(message).isNotNull();
messageHandler.handleMessage(message);
}
}
Не могли бы вы сказать мне где я ошибаюсь? Мое единственное предложение - это неправильное использование сообщений.>
Подробнее здесь: https://stackoverflow.com/questions/759 ... g-delegati