Пружинная интеграция несколько соединений TCP Ping, отправитьJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Пружинная интеграция несколько соединений TCP Ping, отправить

Сообщение Anonymous »

Я внедряю клиент TCP в пружинной интеграции с несколькими подключениями к сокетам и столкнулся с проблемой с поддержанием всех соединений.

Код: Выделить всё

@Slf4j
@Configuration
@EnableIntegration
public class TcpClientConfig {

@Value("${tcp.server.ip}")
private String ip;

@Value("${tcp.server.port}")
private int port;

@Value("${tcp.active.socket}")
private int activeSocket;

public static final String TCP_DEFAULT_CHANNEL = "tcp-client-channel";

private final TcpClientService tcpClientService;

public TcpClientConfig(TcpClientService tcpClientService) {
this.tcpClientService = tcpClientService;
}

@Bean
public List cachingClientConnectionFactories() {
List factories = new ArrayList();

for (int i=0; i {
if (event instanceof TcpConnectionOpenEvent) {
TcpConnection connection = (TcpConnection) ((TcpConnectionOpenEvent) event).getSource();
log.info("TCP Connection Opened: {}", connection.getConnectionId());
} else if (event instanceof TcpConnectionCloseEvent) {
TcpConnection closeConnection = (TcpConnection) ((TcpConnectionCloseEvent) event).getSource();
log.warn("TCP Connection Closed: {}", closeConnection.getConnectionId());

try {
if (!Thread.currentThread().isInterrupted()) {
log.info(factory.getConnection().getConnectionId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connection retrieval interrupted", e);
}

tcpClientService.connect("N");
}
});

CachingClientConnectionFactory cachingFactory = new CachingClientConnectionFactory(factory, activeSocket);
cachingFactory.setLeaveOpen(true);
return cachingFactory;
}

@Bean
@ServiceActivator(inputChannel = TCP_DEFAULT_CHANNEL)
public TcpOutboundGateway tcpOutboundGateway(List cachingClientConnectionFactories) {
TcpOutboundGateway gateway = new TcpOutboundGateway();

FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(cachingClientConnectionFactories);
gateway.setConnectionFactory(failoverFactory);
gateway.setRequiresReply(true);

return gateway;
}

}
Для общения я определил шлюз обмена сообщениями :
@Component
@MessagingGateway(defaultRequestChannel = TcpClientConfig.TCP_DEFAULT_CHANNEL)
public interface TcpClientGateway {
Map send(Map request);
}
< /code>
tcpclientservice < /strong>
Я выполнил запланированную задачу для отправки сообщений Ping: < /p>
public void init() {
isTaskRunning.set(true);

log.info("TCP Connection init");
int successCount = 0;

try {
while(successCount < activeSocket) {
try {
int socketNumber = successCount + 1;
log.info("Attempting to connect socket #{}", socketNumber);

Future future = executorService.submit(() -> connect("N"));

boolean connected = future.get(3, TimeUnit.SECONDS);

if (connected) {
log.info("Successfully connected socket #{}", socketNumber);
successCount++;
} else {
log.error("Failed to connect socket #{}", socketNumber);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
} finally {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
log.error("Error while connecting socket #{}: {}", successCount + 1, e.getMessage());
}
}
}

} catch (Exception e) {
log.error("Error during initialization: {}", e.getMessage());
Thread.currentThread().interrupt();
}

isTaskRunning.set(false);
}

public boolean connect(String isReportLine) {
log.info("TCP Connection connect");

try {
Map connect = new LinkedHashMap();

connect.put("BEGIN", "CONNECT");

Map result = clientGateway.send(connect);

if (!"100".equals(result.get("CODE"))) {
log.error("Gateway Connect Fail");
return false;
}

return true;
} catch (Exception e) {
log.error("[CONNECT] message='{}'", e.getMessage());
return false;
}
}

@Scheduled(fixedRate = 30 * 1000)
public void sendPings() {
log.info("### Send Ping ###");
for (Map.Entry entry : activeConnections.entrySet()) {

this.sendPing();
}
}

public void sendPing() {
try {
if (isTaskRunning.get()) {
return;
}

Map ping = new LinkedHashMap();
ping.put("BEGIN", "PING");

Map result = clientGateway.send(ping);

if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}

Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}

public void sendMessage() {
try {
if (isTaskRunning.get()) {
return;
}

Map send = new LinkedHashMap();
send.put("BEGIN", "SEND");

Map result = clientGateway.send(send);

if (!"100".equals(result.get("CODE"))) {
log.error("[PING] error");
return;
}

Thread.sleep(1000);
} catch (Exception e) {
log.error("[PING] err: {}", e.getMessage());
}
}
...

< /code>
Проблема < /strong>
моя реализация Ping работает и получает ответы, но я считаю, что она не использует все соединения сокетов, которые я создал. Когда я отправляю пинги через метод clientgateway.send (), кажется, что failoverClientConnectionFactory всегда выбирает одно и то же соединение, а не ездить на велосипеде через все доступные соединения.
В результате некоторые соединения остаются бездействующими и в конечном итоге закрываются на сервере, что мне нужно < /raster> < /ping, я должен < /pshope spect yeafe a sopectore. Получает сообщение Ping каждые 30 секунд, чтобы поддерживать его живым. Для обычного трафика сообщений я все еще хочу использовать функциональность отказа для автоматического выбора доступного подключения. Выберите одно из соединений, созданных на шаге 1 для отправки сообщений и получения ответов.>

Подробнее здесь: https://stackoverflow.com/questions/794 ... -ping-send
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»