https://projectreactor.io/docs/kafka/release/reference/[6.8. Параллельная обработка с упорядочиванием на основе секций
][1]
Код: Выделить всё
Scheduler scheduler = Schedulers.newElastic("sample", 60, true);
KafkaReceiver.create(receiverOptions)
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()));
Я упростил код без kafka:
Код: Выделить всё
Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
.groupBy {
it % 2L
}
// .windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
Код: Выделить всё
[0] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[2] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[4] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[6] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[8] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[10] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
Но если представить, что группировка выполняется с windowTimeout - то всё будет хорошо:
Код: Выделить всё
Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
/*
.groupBy {
it % 2L
}
*/
.windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
Код: Выделить всё
[0] - d571b08e-d14c-425e-9062-49de555b6b6e - edef203f-4727-4b9b-bc27-82743e8c16f3
[2] - 8b1b2998-ff83-4b09-990c-93b3ad8e386b - e4b00092-e4b7-4f54-aa8a-90ac4882dea3
[4] - 323450bf-6d2c-4e39-8534-7d7ea63fbc31 - 1be0022c-88f4-4339-a00f-7e9f4f5ed45d
[6] - 8db7d902-5047-4772-9ebb-9b8bef983126 - ce92b804-ac72-4af1-b353-5c7576d71a01
[8] - 838e230c-fdb2-4e33-901e-96b7cbf2543e - f355382e-54ef-4a02-b2ed-a32bf855563c
[10] - d8bfc951-98a1-428a-b697-ebc447a78a40 - 916d2824-4e76-47fb-8573-653e954841eb
Как этого добиться, используя groupBy или, возможно, в этом случае есть замена groupBy?
Подробнее здесь: https://stackoverflow.com/questions/786 ... on-of-data
Мобильная версия