В настоящее время я пытаюсь внести изменения в проект, который использует интеграцию Spring для отправки и получения файлов с/на SFTP-сервер.
Изменения заключаются в добавлении других SFTP-серверов, выберите правильный сервер с условиями и обрабатывайте файлы таким же образом.
Сейчас я борюсь с двумя вещами.
Первая:
У меня есть канал, сообщения которого содержат заголовок с удаленным каталогом, и мне нужен доступ к нужному сеансу SFTP. Но чтобы сделать сессию, мне нужны правильные свойства. (либо config1, либо config2, которые определены в моем application.yml)
Я не уверен, как передать эту информацию в мой ServiceActivator. (Второе задание в моем коде)
Второе:
Мне нужно получить файлы с нескольких SFTP-серверов, и мне нужно сохранить эти файлы в несколько локальных каталогов. Путь между удаленным и локальным доступом не одинаков и определяется в свойствах config1 и config2 так же, как я описал в своей первой проблеме.
Я думаю, что я правильно делегирую сеанс SFTP, но Я не знаю, как установить localDirectory на основе сеанса SFTP. (Первое TODO в моем коде)
Если кто-нибудь сможет мне немного помочь, я буду очень признателен.
Заранее спасибо.< /p>
Вот мой код:
SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;
@Bean
@BridgeTo
public MessageChannel uploadChannel() {
return new PublishSubscribeChannel();
}
@Bean
public ExpressionParser spelExpressionParser() {
return new SpelExpressionParser();
}
public SessionFactory getSftpSession(SftpConfig sftp) {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftp.getHost());
factory.setPort(sftp.getPort());
factory.setUser(sftp.getUser());
factory.setPassword(sftp.getPassword());
factory.setAllowUnknownKeys(true);
factory.setTimeout(sftp.getTimeout());
log.info("timeout is set to: {}", sftp.getTimeout());
return new CachingSessionFactory(factory);
}
@Bean
public DelegatingSessionFactory delegatingSf() {
Map mapSession = new HashMap();
mapSession.put("config1", getSftpSession(sftpConfig1));
mapSession.put("config2", getSftpSession(sftpConfig2));
SessionFactoryLocator sessionFactoryLocator = new DefaultSessionFactoryLocator(mapSession);
return new DelegatingSessionFactory(sessionFactoryLocator);
}
@Bean
public RotatingServerAdvice advice() {
List keyDirectories = sftpConfig1.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList());
keyDirectories.addAll(sftpConfig2.getCodes().stream()
.map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
.collect(Collectors.toList()));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
@Bean
public IntegrationFlow sftpIntegrationFlow() {
return IntegrationFlows.from(
Sftp.inboundAdapter(delegatingSf())
.filter(new SftpSimplePatternFileListFilter("*.csv"))
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localFilter(new AbstractFileListFilter() {
@Override
public boolean accept(final File file) {
return file.getName().endsWith(".csv");
}
})
.deleteRemoteFiles(false)
.temporaryFileSuffix(".new")
.localDirectory(new File()) // TODO dynamic local directory based on sftp session
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
.log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
.channel("filesReceptionChannel")
.enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
.get();
}
@Bean
public MethodInterceptor logNoFileFoundAdvice() {
return invocation -> {
Object result = invocation.proceed();
if (result == null) {
log.info("[SFTP] No files found");
}
return result;
};
}
@Bean
public SftpRemoteFileTemplate sftpTemplate() {
return new SftpRemoteFileTemplate(sftpSession());
}
@Bean
public SessionFactory sftpSession() {
return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
}
@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
return getFtpMessageHandler(sftpSession());
}
public MessageHandler getFtpMessageHandler(SessionFactory sftpSession) {
SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("File expected as payload.");
}
});
handler.setUseTemporaryFileName(false);
return handler;
}
Подробнее здесь: https://stackoverflow.com/questions/787 ... tory-based
Spring Integration, как динамически устанавливать локальный/удаленный каталог на основе сообщения для отправки на удален ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение