Реактивный код:
Версия драйвера:
org.mongodbmongodb-driver-reactivestreams
5.0.1
Код: Выделить всё
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class MongoDBExample2 {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample2.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("
")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MongoClient mongoClient = MongoClients.create("");
// Get a database and collection
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection collection = database.getCollection("mock1");
List documents = getMockData();
CompletableFuture futureResult = bulkWriteDocuments(collection, documents);
futureResult.thenAccept(bulkWriteResult -> {
logger.debug("Bulk write completed: {}", bulkWriteResult);
}).exceptionally(throwable -> {
logger.error("Error occurred during bulk write: ", throwable);
return null;
});
futureResult.get();
mongoClient.close();
}
private static List getMockData() {
List docs = new ArrayList();
List documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel(doc));
}
return docs;
}
public static CompletableFuture bulkWriteDocuments(MongoCollection collection, List documents) {
CompletableFuture future = new CompletableFuture();
Instant start = Instant.now();
collection.bulkWrite(documents).subscribe(new Subscriber() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}
@Override
public void onNext(BulkWriteResult bulkWriteResult) {
System.out.println(" Time taken : " + Duration.between(start, Instant.now()).getSeconds());
future.complete(bulkWriteResult);
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException("Bulk write operation completed without emitting a result"));
}
}
});
return future;
}
private static List generateLargeDocument(int numDocuments) {
List documents = new ArrayList(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Версия драйвера:
org.mongodb
mongodb-driver3.10.0
Код: Выделить всё
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class MongoDBExample {
private static final Logger logger = LoggerFactory.getLogger(MongoDBExample.class);
private static String str;
static {
try {
str = new String(Files.readAllBytes(Paths.get("
")));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("");
MongoDatabase database = mongoClient.getDatabase("testdb");
MongoCollection collection = database.getCollection("mock1");
List documents = getMockData();
Instant start = Instant.now();
collection.bulkWrite(documents);
System.out.println(
"Time taken to generate docs : " + Duration.between(start, Instant.now()).getSeconds());
mongoClient.close();
}
private static List getMockData() {
List docs = new ArrayList();
List documents = generateLargeDocument(30_000);
for (Document doc : documents) {
docs.add(new InsertOneModel(doc));
}
return docs;
}
private static List generateLargeDocument(int numDocuments) {
List documents = new ArrayList(numDocuments);
Instant op = Instant.now();
for (int i = 0; i < numDocuments; i++) {
Document document = new Document();
document.append("Idd", UUID.randomUUID().toString())
.append("content", str);
documents.add(document);
}
System.out.println("Time taken to generate docs : " + Duration.between(op, Instant.now()).getSeconds());
return documents;
}
}
Среднее время, затраченное драйверами синхронизации на передачу 30 000 записей: 2259 секунд
Как интегрировать/обновить реактивную библиотеку mongoDb, чтобы повысить производительность. Есть ли какой-либо конкретный способ реализации, работы с потоками Java или чем-то еще.
В нашем проекте мы обрабатываем примерно 10 000 операций вставки, обновления и удаления в секунду, мы хотели бы улучшить этот показатель. В настоящее время мы в основном используем функцию массовой записи драйверов синхронизации.
Подробнее здесь: https://stackoverflow.com/questions/786 ... grating-fr