Flux groupBy и новый TraceId для каждой новой порции данныхJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Flux groupBy и новый TraceId для каждой новой порции данных

Сообщение Anonymous »

Я хочу реализовать реактивный потребитель Kafka следующим образом:
https://projectreactor.io/docs/kafka/release/reference/[6.8. Параллельная обработка с упорядочиванием на основе секций
][1]

Код: Выделить всё

Scheduler scheduler = Schedulers.newElastic("sample", 60, true);
KafkaReceiver.create(receiverOptions)
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()));
И я хочу отслеживать каждый опрос. Проблема в том, что с помощью groupBy я могу иметь только одно присвоение TraceId для каждой группы только один раз.
Я упростил код без kafka:

Код: Выделить всё

Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
.groupBy {
it % 2L
}
//    .windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
и результат:

Код: Выделить всё

[0] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[2] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[4] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[6] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[8] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
[10] - d69da7b2-205b-4742-b538-308192af29d6 - 529c50a5-4966-4b52-bd63-18d4b879d8d4
Тот же TraceId1 и тот же TraceId2.
Но если представить, что группировка выполняется с windowTimeout - то всё будет хорошо:

Код: Выделить всё

Flux
.interval(Duration.ofMillis(100)) // kafka messages
.filter {
it % 2L == 0L // for simplicity, we will leave only one group
}
/*
.groupBy {
it % 2L
}
*/
.windowTimeout(2, Duration.ofMillis(10))
.flatMap { group ->
val traceId1 = UUID.randomUUID().toString() // for clarity, two traceId
Flux.deferContextual {
Mono.just(it.get("traceId2")) //for clarity, two traceId
}.flatMap { traceId2 ->
group.bufferTimeout(2, Duration.ofMillis(10))
.concatMap {
// here I handle my batch
Mono.just(it).delayElement(Duration.ofMillis(50))
}.flatMap {
// and here I will commit
println("$it - $traceId1 - $traceId2")
Mono.just(it)
}
}
.contextWrite {
it.put("traceId2", UUID.randomUUID().toString())
}
}
.blockLast()
и имеем следующий результат:

Код: Выделить всё

[0] - d571b08e-d14c-425e-9062-49de555b6b6e - edef203f-4727-4b9b-bc27-82743e8c16f3
[2] - 8b1b2998-ff83-4b09-990c-93b3ad8e386b - e4b00092-e4b7-4f54-aa8a-90ac4882dea3
[4] - 323450bf-6d2c-4e39-8534-7d7ea63fbc31 - 1be0022c-88f4-4339-a00f-7e9f4f5ed45d
[6] - 8db7d902-5047-4772-9ebb-9b8bef983126 - ce92b804-ac72-4af1-b353-5c7576d71a01
[8] - 838e230c-fdb2-4e33-901e-96b7cbf2543e - f355382e-54ef-4a02-b2ed-a32bf855563c
[10] - d8bfc951-98a1-428a-b697-ebc447a78a40 - 916d2824-4e76-47fb-8573-653e954841eb
Разные TraceId1 и разные TraceId2.
Как этого добиться, используя groupBy или, возможно, в этом случае есть замена groupBy?

Подробнее здесь: https://stackoverflow.com/questions/786 ... on-of-data
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»