Код: Выделить всё
Connection ID:0030887-f9c8-429f-b5d1-eda15330e262:1 connected to server: amqps://my.queue.address:54321
Есть ли конфигурация или способ уменьшить количество соединений, созданных Apache Qpid в приложениях Apache Beam?
Мой код
Я вызываю модуль записи в следующем методе :
Код: Выделить всё
@NotNull
@Override
public POutput expand(@NotNull PCollection input) {
input.apply(
"WriteMessageToMyTopic",
JmsIOWriter.writeMessage());
return PDone.in(input.getPipeline());
}
Вот код средства записи
Код: Выделить всё
package com.example;
import com.example.StaticConnectionFactory;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.io.jms.RetryConfiguration;
final class JmsIOWriter implements Serializable {
public static JmsIO.Write writeMessage() {
var connectionFactory = StaticConnectionFactory.getFactory();
return JmsIO.write()
.withRetryConfiguration(RetryConfiguration.create(5))
.withConnectionFactory(connectionFactory)
.withTopicNameMapper(m -> topicMapper(m, "my/topic/"))
.withValueMapper(JmsIOWriter::valueMapper);
}
private static Message valueMapper(String element, Session session) {
try {
return session.createTextMessage(element);
} catch (JMSException ex) {
throw new IllegalStateException(
String.format("Unable to send message in queue %s", element), ex);
}
}
private static String topicMapper(String message) {
return "my/topic/" + message.length;
}
}
Я использую статическую фабрику соединений, которую я внедряю в модуль записи
Код: Выделить всё
package com.example;
import org.apache.qpid.jms.JmsConnectionFactory;
public final class StaticConnectionFactory {
private static JmsConnectionFactory factory;
public static synchronized JmsConnectionFactory getFactory() {
if (factory == null) {
factory = new SslJmsConnectionFactory();
factory.setUsername("myUsername");
factory.setPassword("myPassword");
factory.setRemoteURI("amqps://my.queue.address:54321");
factory.setForceAsyncAcks(true);
factory.setReceiveLocalOnly(true);
factory.setReceiveNoWaitLocalOnly(true);
}
return factory;
}
}
Эта статическая фабрика соединений возвращает фабрику SSL-соединений
Код: Выделить всё
package com.example;
import static com.example.SslContextFactory.createSslContext;
import java.util.Base64;
import javax.jms.Connection;
import javax.jms.JMSException;
public class SslJmsConnectionFactory extends org.apache.qpid.jms.JmsConnectionFactory {
@Override
public Connection createConnection(String username, String password) throws JMSException {
initSslContext();
return super.createConnection(username, password);
}
private void initSslContext() {
setSslContext(
createSslContext(
Base64.getDecoder().decode(this.getUsername()), this.getPassword().toCharArray()));
}
}
Эта фабрика SSL-соединений использует фабрику контекста SSL для инициализации контекста SSL
Код: Выделить всё
package com.example;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
public final class SslContextFactory {
public static SSLContext createSslContext(byte[] credentials, char[] password) {
try (var inputStream = new ByteArrayInputStream(credentials)) {
var keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(inputStream, password);
return initSslContext(keyStore, password);
} catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
throw new IllegalStateException("Cannot create SSL context", e);
}
}
private static SSLContext initSslContext(KeyStore keyStore, char[] password) {
try {
var sslContext = SSLContext.getInstance("TLSv1.2");
var keyManagerFactory = createKeyManagerFactory(keyStore, password);
sslContext.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());
return sslContext;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Unable to initialize SSL context", e);
}
}
private static KeyManagerFactory createKeyManagerFactory(KeyStore keyStore, char[] password) {
try {
var keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
keyManagerFactory.init(keyStore, password);
return keyManagerFactory;
} catch (NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException e) {
throw new IllegalStateException("Unable to initialize a key manager factory", e);
}
}
}
- Использовать статическую фабрику соединений JMS, чтобы избежать повторного создания соединений, никакого эффекта.
- Настройте контекст SSL при инициализации фабрики соединений JMS вместо ее инициализации при создании соединения. Мне не удалось подключиться к очереди из-за org.apache.qpid.jms.provider.Exceptions.ProviderConnectionSecuritySaslException: клиенту не удалось пройти аутентификацию с использованием SASL: PLAIN
- В настоящее время я не могу используйте новый Solace Writer, доступный начиная с Apache Beam 2.61.0, поскольку он помечен как экспериментальный
Подробнее здесь: https://stackoverflow.com/questions/792 ... lace-using