Код: Выделить всё
Instant threshold = Instant.now().minus(retention);
String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
QueryResult result = cluster.query(query);
Ответ заключается в потоковой передаче данных: прочитайте столько, сколько сможете переварить, переварите и забудьте. Повторяйте до тех пор, пока не закончите.
В документации Couchbase упоминается о переключении на реактивные и асинхронные API:
Реактивные и асинхронные API
[...]
Кроме того, есть еще одна причина, по которой вы хотите использовать реактивный API: потоковая передача больших результатов с противодавлением со стороны приложения. И блокирующий, и асинхронный API не имеют средств для правильной сигнализации противодавления, поэтому, если вам это нужно, реактивный API – ваш лучший вариант.
Поэтому я изменил свой фрагмент кода на следующий:
Код: Выделить всё
Instant threshold = Instant.now().minus(retention);
String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
ReactiveCluster rc = couchbase.reactive();
try {
rc.query(query,
QueryOptions.queryOptions()
.readonly(true)
.scanConsistency(QueryScanConsistency.REQUEST_PLUS)
.metrics(true)
).flux().flatMap(result -> {
Flux rows = result.rowsAs(JsonObject.class);
return rows;
}).subscribe(row -> {
// process one row
}, (Throwable t) -> {
log.error("Could not read data", t);
}
);
} finally {
rc.disconnect();
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... -async-api
Мобильная версия