Получение (java.sql.sqlrecoverableException: IO Ошибка: чтение сокета прерван, аутентификация, пропасть 0 мс.) При исполJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Получение (java.sql.sqlrecoverableException: IO Ошибка: чтение сокета прерван, аутентификация, пропасть 0 мс.) При испол

Сообщение Anonymous »

В настоящее время я запускаю небольшое приложение, которое периодически опробовает данные из моего БД, а затем помещает их в тему Кафки. Во время запуска кода приложения независимо, когда я комментирую свою раковину Kafka, приложение работает правильно: < /p>

Код: Выделить всё

KafkaSink stringSink =
KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("asset.sync.kafka.sink.topic.name"));

dbStream
.map(Object::toString)   // or map your custom object to String
.sinkTo(stringSink)
.name("String Kafka Sink");
< /code>
Но запуск как раковина, так и источник вместе дает исключение: < /p>
java.sql.SQLRecoverableException: IO Error: Socket read interrupted, Authentication lapse 0 ms.
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:874)
at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562)
at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138)
at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
at com.zaxxer.hikari.pool.HikariPool.(HikariPool.java:115)
at com.zaxxer.hikari.HikariDataSource.(HikariDataSource.java:81)
at ***.***.hcmp.config.DBConfig.getDataSource(DBConfig.java:22)
at ***.***.hcmp.source.assetMaster.DBPollingSource.open(DBPollingSource.java:62)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Socket read interrupted, Authentication lapse 0 ms.
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:870)
...  25 common frames omitted
Caused by: java.io.InterruptedIOException: Socket read interrupted
at oracle.net.nt.TimeoutSocketChannel.handleInterrupt(TimeoutSocketChannel.java:258)
at oracle.net.nt.TimeoutSocketChannel.read(TimeoutSocketChannel.java:180)
at oracle.net.ns.NSProtocolNIO.doSocketRead(NSProtocolNIO.java:555)
at oracle.net.ns.NIOPacket.readNIOPacket(NIOPacket.java:403)
at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:127)
at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340)
at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596)
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588)
< /code>
Вот код приложения: < /p>
public class RefreshPollingApplication {

private static final Logger log = LoggerFactory.getLogger(RefreshPollingApplication.class);

public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromPropertiesFile(
RefreshPollingApplication.class.getClassLoader().getResourceAsStream("application-local.properties")
);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.of(10, TimeUnit.SECONDS)
));
try{
Properties props = ConfigLoader.load("application-local.properties");
Properties kafkaProps = getKafkaProperties(props);
log.info("@@@@@@@@@@@@@ Properties loaded");

SimpleDBConfig dbConfig = new SimpleDBConfig(params.get("flink.jdbc.url"), params.get("flink.jdbc.username"), params.get("flink.jdbc.password"));
Connection connection = dbConfig.getConnection();
DataStream dbStream = env.addSource(
new JdbcPollingSource()
).setParallelism(1).name("DB Polling Source");
KafkaSink stringSink =
KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("****.***.kafka.sink.topic.name"));

dbStream
.map(Object::toString)
.sinkTo(stringSink)
.name("String Kafka Sink");
env.execute("Refresh polling job.");
}catch (Exception e){
log.error("@@@@@@@@ERROR OCCURRED ON POLLING TASK, REASON: {}", e.getMessage());
throw e;
}
}

}
< /code>
Класс функции источника: < /p>
public class JdbcPollingSource extends RichSourceFunction {

private volatile boolean isRunning = true;
private transient Connection connection;
private transient PreparedStatement stmt;

private String url;
private String userName;
private String password;

@Override
public void open(Configuration parameters) throws Exception {
Class.forName("oracle.jdbc.OracleDriver");
super.open(parameters);
Map globalParams = getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters()
.toMap();
ParameterTool params = ParameterTool.fromMap(globalParams);
this.url = params.get("flink.jdbc.url");
userName = params.get("flink.jdbc.username");
password = params.get("flink.jdbc.password");
reconnect();
}

private void reconnect() throws Exception {

if (connection != null && !connection.isClosed()) {
try { connection.close(); } catch (Exception ignored) {}
}
try {
connection = DriverManager.getConnection(
url, userName, password
);
stmt = connection.prepareStatement(
"*******"
);
}catch (Exception e){
System.out.println("Error connecting to data source, exception -> " + e.getMessage());
}
}

@Override
public void run(SourceContext  ctx) throws Exception {
while (isRunning) {
try {
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String record = rs.getLong("***") + "," + rs.getString("***");
ctx.collect(record);
}
} catch (SQLException e) {
reconnect();
}
Thread.sleep(120_000);
}
}

@Override
public void cancel() {
isRunning = false;
try { if (stmt != null) stmt.close(); } catch (Exception ignore) {}
try { if (connection != null) connection.close(); } catch (Exception ignore) {}
}
}
возможно ли, что фоновый поток мешает соединению DB?


Подробнее здесь: https://stackoverflow.com/questions/797 ... rrupted-au
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»