Как мне этого добиться? Есть ли другой способ, кроме как вызвать KILL
для отдельного соединения? Если да, то я могу это сделать, но не понимаю, как определить точный PID моего длительного запроса. Есть ли для этого надежный метод?
Короче, как я могу, используя MySQL Connector J, отменить длительный запрос по требованию? Я был бы очень признателен за ссылку на рабочий пример с кодом, если это вообще возможно.
Для справки, вот пример кода, написанного на Kotlin:
< pre class="lang-kotlin Prettyprint-override">
Код: Выделить всё
val data = produce {
db.connection.use { conn ->
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY).use { stmt ->
stmt.fetchSize = Int.MIN_VALUE
stmt.executeQuery("...").use { rs ->
var batch = mutableListOf()
while (rs.next()) {
// Handle result set here
val data = Data(
id = rs.getLong("id"),
subAccountId = rs.getInt("sub_account_id")
)
batch.add(data)
if (batch.size >= 100) {
println("Sending batch...")
send(batch)
if (done) {
println("Done, closing...")
close()
break
}
batch = mutableListOf()
}
}
println("Out of loop, cancelling statement")
synchronized(this) { // my lame attempt at cancelling
println("In synchronized")
if (!stmt.isClosed) {
println("Statement is not yet closed") // DOES print
stmt.cancel()
stmt.close()
conn.close() // probably pointless but I tried
}
}
}
println("Out of statement") // does NOT print
}
println("Out of connection") // does NOT print
}
println("End of production...") // does NOT print
}
РЕДАКТИРОВАТЬ:
p>
Я наткнулся на обсуждение на этом форуме и после некоторых возни придумал этот код:
Код: Выделить всё
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
fun dbSqlKillTestWithCoroutines() {
runBlocking {
db.connection.use { conn ->
conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY).use { stmt ->
stmt.fetchSize = Int.MIN_VALUE
// Get connection ID
val connectionId = stmt.executeQuery("SELECT CONNECTION_ID()").use { rs ->
rs.next()
rs.getString(1)
}
// Produce data
val data = produce(Dispatchers.IO) { // Dispatchers.IO is not necessary for query to be killed
stmt.executeQuery("SELECT ...").use { rs -> // omitted SQL
var batch = mutableListOf()
while (rs.next()) {
val data = Data(...) // omitted mapper
batch.add(data)
if (batch.size >= 100) {
println("Sending batch")
send(batch)
batch = mutableListOf()
}
}
}
}
// Consume data
var count = 0
for (batch in data) {
count++
println("Received batch $count")
if (count >= 3) {
println("Attempting to cancel query...")
println("Breaking...")
break
}
}
// Method 3, no new thread at all
// println("Breaking in same context...")
// db.connection.use { conn2 ->
//// conn2.catalog = "information_schema"
//// conn2.autoCommit = true
// conn2.createStatement().use { stmt2 ->
// stmt2.execute("KILL QUERY $connectionId")
// }
// }
// Method 2, use coroutines to create Thread
// newSingleThreadContext("ctx").use { ctx ->
// launch(ctx) {
// println("Breaking in single thread context...")
// db.connection.use { conn2 ->
//// conn2.catalog = "information_schema"
//// conn2.autoCommit = true
// conn2.createStatement().use { stmt2 ->
// stmt2.execute("KILL QUERY $connectionId")
// }
// }
// }
// }
// Method 1, create raw Thread() instance manually
thread(true) {
println("Breaking in thread...")
db.connection.use { conn2 ->
// conn2.catalog = "information_schema"
// conn2.autoCommit = true
conn2.createStatement().use { stmt2 ->
stmt2.execute("KILL QUERY $connectionId")
}
}
}
}
}
}
}
Короче, этот код вроде работает, поэтому буду использовать его в своих целях. Моя цель — запустить длительный запрос SELECT (выполнение которого может занять неделю или больше) и обрабатывать данные по мере их поступления, чтобы не потреблять слишком много памяти (это может занять много гигабайт памяти). в противном случае). Затем я хочу быть готовым устранить любые ошибки, возникающие при обработке, остановив запрос и обработку. В моем случае я могу остановиться в любой момент и продолжить с того места, на котором остановился, и мне не придется перезапускать запрос с нуля.
Если кто-нибудь более знающий может прийти и внести свой вклад, как сделать его лучше/более пуленепробиваемым, лично я буду признателен.
Подробнее здесь: https://stackoverflow.com/questions/786 ... y-via-jdbc
Мобильная версия