Я использую Spring Integration и у меня нет информации о базовой реализации TCP-сервера, но структура сообщения состоит из заголовка с 4 байтами символов ASCII, используемых для указания длина сообщения (исключая заголовок), за которым следует само сообщение.
Например, если длина сообщения составляет 128 байт, в начало будет добавлено значение заголовка «0128». сообщения. Таким образом, фактическая длина отправляемых данных составляет 132 байта.
Пример сообщения будет таким: 0022thisisanexamplemessage
Я следую этому опубликуйте настройку 2 экземпляров TcpSendingMessageHandler с 2 экземплярами FailoverClientConnectionFactory и используйте тот же outboundChannel в качестве входного канала для достижения циклического распределения, как показано ниже (и успешной отправки сообщений на TCP-сервер):< /p>
Код: Выделить всё
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryOne());
handler.setClientMode(true);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
handler.setClientMode(true);
return handler;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
List factories = new ArrayList();
factories.add(tcpNetClientConnectionFactoryOne());
factories.add(tcpNetClientConnectionFactoryTwo());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
List factories = new ArrayList();
factories.add(tcpNetClientConnectionFactoryTwo());
factories.add(tcpNetClientConnectionFactoryOne());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
private ByteArrayLengthHeaderSerializer codec() {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
serializer.setMaxMessageSize(8 * 1024);
serializer.setInclusive(false);
return serializer;
}
Код: Выделить всё
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer tcpInboundChannelOne() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public MessageProducer tcpInboundChannelTwo() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public IntegrationFlow tcpInboundFlow() {
return IntegrationFlow.from(inboundChannel())
.handle(message -> handleMessage(byte[] message.getPayload()))
.get();
}
private void handleMessage(byte[] payload) {
// Process incoming message here
log.info("Received: {}", new String(payload));
}
Подробнее здесь: https://stackoverflow.com/questions/787 ... -tcp-serve