Java - Apache Beam - Количество управления соединений при написании в MongoDBJAVA

Программисты JAVA общаются здесь
Anonymous
Java - Apache Beam - Количество управления соединений при написании в MongoDB

Сообщение Anonymous »

В настоящее время я работаю над потоковым конвейером в Beam Apache (v2.43), чтобы вставить данные в MongoDB. Он работает на DATAFLOW довольно хорошо, но я не могу контролировать количество соединений: в случае пика ввода (PubSub), DataFlow масштабирует и переполняет монго тысячами соединений. Даже с ограниченным числом работников ( 15 ) я достигаю более 20 тыс. Соединений со стороны MongoDB. Чтобы улучшить выступления, я накапливаю записи до достижения ряда элементов, а затем промываю буфер. Я поместил максимальный размер элементов 10 в подключаемые установки, поэтому я ожидал не более 150 соединений максимум (15 рабочих * 10 соединение/работник).
Вот фрагмент кода.

Код: Выделить всё

static class BatchUpsertFn extends DoFn {

private final ConcurrentHashMap workerHashmap;
private final String instanceId;
private List batch;

BatchUpsertFn() {
this.workerHashmap = new ConcurrentHashMap();
this.instanceId = UUID.randomUUID().toString();
}

@Setup
public void createMongoClient() {
workerHashmap.putIfAbsent(instanceId, MongoClients
.create(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("myUri"))
.applyToConnectionPoolSettings(builder -> builder
.maxConnectionIdleTime(60, SECONDS)
.maxConnectionLifeTime(60, SECONDS)
.maxSize(10)
.minSize(1)
)
.applyToSocketSettings(builder -> builder
.connectTimeout(10, SECONDS)
.readTimeout(20, SECONDS))
.build()));
}

@StartBundle
public void startBundle() {
batch = new ArrayList();
}

@ProcessElement
public void processElement(ProcessContext ctx) {

batch.add(
new UpdateManyModel(
ctx.element().getKey(),
ctx.element().getValue(),
new UpdateOptions().upsert(true)));

// If batch limit is exceeded
if (batch.size() >= 1024L) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError)));
}
}
}

@FinishBundle
public void finishBundle(FinishBundleContext ctx) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError),
Instant.now(),
GlobalWindow.INSTANCE));
}
}

private void flush() throws MongoBulkWriteException {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = workerHashmap.get(instanceId).getDatabase("myDatabase");
MongoCollection mongoCollection = mongoDatabase.getCollection("myCollection");
try {
mongoCollection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
batch.clear();
} catch (MongoBulkWriteException pException) {
batch.clear();
throw pException;
}
}

@Teardown
public void closeMongoClient() {
workerHashmap.get(instanceId).close();
}
}
Любая идея более чем приветствуется

Подробнее здесь: https://stackoverflow.com/questions/764 ... in-mongodb

Вернуться в «JAVA»