Как я могу ограничить количество создаваемых соединений при подключении к Solace с помощью библиотеки qpid JMS в приложеJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу ограничить количество создаваемых соединений при подключении к Solace с помощью библиотеки qpid JMS в приложе

Сообщение Anonymous »

Мое приложение Apache Beam отправляет сообщения в утешительную очередь. Чтобы отправить сообщение, я использую библиотеку apache qpid и службу сообщений Java (JMS). Однако когда я запускаю свое приложение, для двух узлов создается более 2000 подключений. Когда я запускаю приложение луча, у меня появляется более 2000 сообщений журналов, подобных следующим:

Код: Выделить всё

Connection ID:0030887-f9c8-429f-b5d1-eda15330e262:1 connected to server: amqps://my.queue.address:54321
Меня это беспокоит, так как количество подключений к Solace ограничено (400 подключений).
Есть ли конфигурация или способ уменьшить количество соединений, созданных Apache Qpid в приложениях Apache Beam?
Мой код
Я вызываю модуль записи в следующем методе :

Код: Выделить всё

@NotNull
@Override
public POutput expand(@NotNull PCollection input) {
input.apply(
"WriteMessageToMyTopic",
JmsIOWriter.writeMessage());

return PDone.in(input.getPipeline());
}
JMS Writer
Вот код средства записи

Код: Выделить всё

package com.example;

import com.example.StaticConnectionFactory;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.RetryConfiguration;

final class JmsIOWriter implements Serializable {

public static JmsIO.Write writeMessage() {
var connectionFactory = StaticConnectionFactory.getFactory();

return JmsIO.write()
.withRetryConfiguration(RetryConfiguration.create(5))
.withConnectionFactory(connectionFactory)
.withTopicNameMapper(m -> topicMapper(m, "my/topic/"))
.withValueMapper(JmsIOWriter::valueMapper);
}

private static Message valueMapper(String element, Session session) {
try {
return session.createTextMessage(element);
} catch (JMSException ex) {
throw new IllegalStateException(
String.format("Unable to send message in queue %s", element), ex);
}
}

private static String topicMapper(String message) {
return "my/topic/"  + message.length;
}
}
Фабрика соединений
Я использую статическую фабрику соединений, которую я внедряю в модуль записи

Код: Выделить всё

package com.example;

import org.apache.qpid.jms.JmsConnectionFactory;

public final class StaticConnectionFactory {

private static JmsConnectionFactory factory;

public static synchronized JmsConnectionFactory getFactory() {
if (factory == null) {
factory = new SslJmsConnectionFactory();
factory.setUsername("myUsername");
factory.setPassword("myPassword");
factory.setRemoteURI("amqps://my.queue.address:54321");
factory.setForceAsyncAcks(true);
factory.setReceiveLocalOnly(true);
factory.setReceiveNoWaitLocalOnly(true);
}
return factory;
}
}
Фабрика SSL-соединений
Эта статическая фабрика соединений возвращает фабрику SSL-соединений

Код: Выделить всё

package com.example;

import static com.example.SslContextFactory.createSslContext;

import java.util.Base64;
import javax.jms.Connection;
import javax.jms.JMSException;

public class SslJmsConnectionFactory extends org.apache.qpid.jms.JmsConnectionFactory {

@Override
public Connection createConnection(String username, String password) throws JMSException {
initSslContext();
return super.createConnection(username, password);
}

private void initSslContext() {
setSslContext(
createSslContext(
Base64.getDecoder().decode(this.getUsername()), this.getPassword().toCharArray()));
}
}
Фабрика контекста SSL
Эта фабрика SSL-соединений использует фабрику контекста SSL для инициализации контекста SSL

Код: Выделить всё

package com.example;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;

public final class SslContextFactory {

public static SSLContext createSslContext(byte[] credentials, char[] password) {

try (var inputStream = new ByteArrayInputStream(credentials)) {
var keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(inputStream, password);
return initSslContext(keyStore, password);
} catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
throw new IllegalStateException("Cannot create SSL context", e);
}
}

private static SSLContext initSslContext(KeyStore keyStore, char[] password) {
try {
var sslContext = SSLContext.getInstance("TLSv1.2");
var keyManagerFactory = createKeyManagerFactory(keyStore, password);
sslContext.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());
return sslContext;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Unable to initialize SSL context", e);
}
}

private static KeyManagerFactory createKeyManagerFactory(KeyStore keyStore, char[] password) {
try {
var keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(keyStore, password);
return keyManagerFactory;
} catch (NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException e) {
throw new IllegalStateException("Unable to initialize a key manager factory", e);
}
}
}
Что я пробовал
  • Использовать статическую фабрику соединений JMS, чтобы избежать повторного создания соединений, никакого эффекта.
  • Настройте контекст SSL при инициализации фабрики соединений JMS вместо ее инициализации при создании соединения. Мне не удалось подключиться к очереди из-за org.apache.qpid.jms.provider.Exceptions.ProviderConnectionSecuritySaslException: клиенту не удалось пройти аутентификацию с использованием SASL: PLAIN
  • В настоящее время я не могу используйте новый Solace Writer, доступный начиная с Apache Beam 2.61.0, поскольку он помечен как экспериментальный


Подробнее здесь: https://stackoverflow.com/questions/792 ... lace-using
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»