Вот фрагмент кода.
Код: Выделить всё
static class BatchUpsertFn extends DoFn {
private final ConcurrentHashMap workerHashmap;
private final String instanceId;
private List batch;
BatchUpsertFn() {
this.workerHashmap = new ConcurrentHashMap();
this.instanceId = UUID.randomUUID().toString();
}
@Setup
public void createMongoClient() {
workerHashmap.putIfAbsent(instanceId, MongoClients
.create(MongoClientSettings.builder()
.applyConnectionString(new ConnectionString("myUri"))
.applyToConnectionPoolSettings(builder -> builder
.maxConnectionIdleTime(60, SECONDS)
.maxConnectionLifeTime(60, SECONDS)
.maxSize(10)
.minSize(1)
)
.applyToSocketSettings(builder -> builder
.connectTimeout(10, SECONDS)
.readTimeout(20, SECONDS))
.build()));
}
@StartBundle
public void startBundle() {
batch = new ArrayList();
}
@ProcessElement
public void processElement(ProcessContext ctx) {
batch.add(
new UpdateManyModel(
ctx.element().getKey(),
ctx.element().getValue(),
new UpdateOptions().upsert(true)));
// If batch limit is exceeded
if (batch.size() >= 1024L) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError)));
}
}
}
@FinishBundle
public void finishBundle(FinishBundleContext ctx) {
try {
flush();
} catch (MongoBulkWriteException pException) {
pException.getWriteErrors()
.forEach(pBulkWriteError ->
ctx.output(new CustomBulkWriteError(pBulkWriteError),
Instant.now(),
GlobalWindow.INSTANCE));
}
}
private void flush() throws MongoBulkWriteException {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = workerHashmap.get(instanceId).getDatabase("myDatabase");
MongoCollection mongoCollection = mongoDatabase.getCollection("myCollection");
try {
mongoCollection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
batch.clear();
} catch (MongoBulkWriteException pException) {
batch.clear();
throw pException;
}
}
@Teardown
public void closeMongoClient() {
workerHashmap.get(instanceId).close();
}
}
Подробнее здесь: https://stackoverflow.com/questions/764 ... in-mongodb