фон < /p>
Я новичок в Apache Pekko и R2DBC, работая над реактивным приложением с использованием Pekko-persistence-r2dbc с базой данных PostgreSQL, размещенной на AWS RDS). Мое заявление обрабатывает события через проекции Pekko, чтобы обновить таблицу public.campaigns, но я сталкиваюсь с прерывистой проблемой, в которой выбранное запрос возвращает intaintal.empty, а последующий запрос об обновлении влияет на 0 строк, даже если подтверждается записи кампании. Событие запускает запрос на обновление (Update public.campaigns Установить сообщение_SENT_COUNT = Message_sent_Count + 1, total_cost = total_cost + $ 1, где ID = 2 долл. США;). Время от времени это обновление затрагивает 0 строк, и кампания журналов Show существует с id optional.empty, несмотря на кампанию (ID: 0197BF4A-1AB7-72B3-92EF-EBE128BDB15E), вставленную ~ 500–900 мс ранее. This happens in a low-load environment (single message, no concurrency) and doesn’t occur every time—sometimes the update works correctly.
Environment
Database: PostgreSQL on AWS RDS (unwaveringmedia_platform_prod)
Driver: R2DBC (r2dbc-postgresql)
Framework: Apache Pekko с pekko-persistence-r2dbc < /p>
Пул соединений < /p>
pekko.persistence.r2dbc {
connection-factory {
initial-size = 100
max-size = 100
connect-timeout = 5 seconds
acquire-timeout = 10 seconds
acquire-retry = 1
max-idle-time = 10 minutes
}
query {
behind-current-time = 5000 millis
refresh-interval = 20s
}
}
< /code>
code
public class CampaignTableStatisticsUpdaterProjectionHandler extends R2dbcHandler {
@Override
public CompletionStage process(R2dbcSession session, EventEnvelope eventEventEnvelope) throws Exception {
if (eventEventEnvelope.event().getTimestamp().isBefore(ServiceUtils.BEFORE_INCIDENT)) {
return CompletableFuture.completedFuture(Done.done());
}
log.debug("Processing event: {}, sequence: {}", eventEventEnvelope.event(), eventEventEnvelope.sequenceNr());
if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageSent evt) {
var selectStatement = session
.createStatement("SET TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT id FROM public.campaigns WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.selectOne(selectStatement, row -> row.get("id"))
.thenCompose(row -> {
log.info("Campaign exists with id {}", row);
if (row == null) {
return CompletableFuture.completedFuture(Done.done());
}
var updateStatement = session
.createStatement("UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;")
.bind(0, evt.getMessageCost())
.bind(1, evt.getCampaignId());
return session.updateOne(updateStatement)
.thenApply(rowsUpdated -> {
log.info("Successfully updated campaign {}, rows affected: {}", evt.getCampaignId(), rowsUpdated);
return Done.done();
});
});
} else if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageCreated evt) {
var statement = session
.createStatement("UPDATE public.campaigns SET message_created_count = message_created_count + 1 WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.updateOne(statement).thenApply(rowsUpdated -> Done.done());
}
return CompletableFuture.completedFuture(Done.done());
}
}
< /code>
Logs
Executing update: PostgresqlStatement{bindings=[Binding{parameters=[Parameter{format=FORMAT_BINARY, type=701, value=MonoSupplier}, Parameter{format=FORMAT_TEXT, type=1043, value=MonoSupplier}]}], context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=PT5S, errorResponseLogLevel=DEBUG, database='unwaveringmedia_platform_prod', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda/0x00007f0cdea42348@234a08ea, forceBinary='true', host='unwavering-media-marketing-platform-db.cl248qc4ol90.us-east-1.rds.amazonaws.com', lockWaitTimeout='null, loopResources='null', noticeLogLevel='DEBUG', options='{}', password='**********************', port=5432, preferAttachedBuffers=true, socket=null, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, username='boban'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@28e388a9, statementCache=LimitedStatementCache{cache={io.r2dbc.postgresql.BoundedStatementCache$CacheKey@9418b150=S_0, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@589270a3=S_1, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@f6d8b11a=S_2, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@de50be46=S_3}, counter=4, client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, limit=5000}}, sql='UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;', generatedColumns=null}
2025-06-30 05:23:18.924 [INFO ] [com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Campaign exists with id Optional.empty MDC: {}
2025-06-30 05:23:18.925 [INFO ] `enter code here`[com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Successfully updated campaign 0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {}
Подробнее здесь: https://stackoverflow.com/questions/796 ... sfully-upd
Непоследовательное поведение обновления с R2DBC и PostgresQL: возвращает успешно обновленную кампанию 0197BF72B3-92EF-EB ⇐ Apache
1751423969
Anonymous
фон < /p>
Я новичок в Apache Pekko и R2DBC, работая над реактивным приложением с использованием Pekko-persistence-r2dbc с базой данных PostgreSQL, размещенной на AWS RDS). Мое заявление обрабатывает события через проекции Pekko, чтобы обновить таблицу public.campaigns, но я сталкиваюсь с прерывистой проблемой, в которой выбранное запрос возвращает intaintal.empty, а последующий запрос об обновлении влияет на 0 строк, даже если подтверждается записи кампании. Событие запускает запрос на обновление (Update public.campaigns Установить сообщение_SENT_COUNT = Message_sent_Count + 1, total_cost = total_cost + $ 1, где ID = 2 долл. США;). Время от времени это обновление затрагивает 0 строк, и кампания журналов Show существует с id optional.empty, несмотря на кампанию (ID: 0197BF4A-1AB7-72B3-92EF-EBE128BDB15E), вставленную ~ 500–900 мс ранее. This happens in a low-load environment (single message, no concurrency) and doesn’t occur every time—sometimes the update works correctly.
Environment
Database: PostgreSQL on AWS RDS (unwaveringmedia_platform_prod)
Driver: R2DBC (r2dbc-postgresql)
Framework: Apache Pekko с pekko-persistence-r2dbc < /p>
Пул соединений < /p>
pekko.persistence.r2dbc {
connection-factory {
initial-size = 100
max-size = 100
connect-timeout = 5 seconds
acquire-timeout = 10 seconds
acquire-retry = 1
max-idle-time = 10 minutes
}
query {
behind-current-time = 5000 millis
refresh-interval = 20s
}
}
< /code>
code
public class CampaignTableStatisticsUpdaterProjectionHandler extends R2dbcHandler {
@Override
public CompletionStage process(R2dbcSession session, EventEnvelope eventEventEnvelope) throws Exception {
if (eventEventEnvelope.event().getTimestamp().isBefore(ServiceUtils.BEFORE_INCIDENT)) {
return CompletableFuture.completedFuture(Done.done());
}
log.debug("Processing event: {}, sequence: {}", eventEventEnvelope.event(), eventEventEnvelope.sequenceNr());
if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageSent evt) {
var selectStatement = session
.createStatement("SET TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT id FROM public.campaigns WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.selectOne(selectStatement, row -> row.get("id"))
.thenCompose(row -> {
log.info("Campaign exists with id {}", row);
if (row == null) {
return CompletableFuture.completedFuture(Done.done());
}
var updateStatement = session
.createStatement("UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;")
.bind(0, evt.getMessageCost())
.bind(1, evt.getCampaignId());
return session.updateOne(updateStatement)
.thenApply(rowsUpdated -> {
log.info("Successfully updated campaign {}, rows affected: {}", evt.getCampaignId(), rowsUpdated);
return Done.done();
});
});
} else if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageCreated evt) {
var statement = session
.createStatement("UPDATE public.campaigns SET message_created_count = message_created_count + 1 WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.updateOne(statement).thenApply(rowsUpdated -> Done.done());
}
return CompletableFuture.completedFuture(Done.done());
}
}
< /code>
[b]Logs[/b]
Executing update: PostgresqlStatement{bindings=[Binding{parameters=[Parameter{format=FORMAT_BINARY, type=701, value=MonoSupplier}, Parameter{format=FORMAT_TEXT, type=1043, value=MonoSupplier}]}], context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=PT5S, errorResponseLogLevel=DEBUG, database='unwaveringmedia_platform_prod', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda/0x00007f0cdea42348@234a08ea, forceBinary='true', host='unwavering-media-marketing-platform-db.cl248qc4ol90.us-east-1.rds.amazonaws.com', lockWaitTimeout='null, loopResources='null', noticeLogLevel='DEBUG', options='{}', password='**********************', port=5432, preferAttachedBuffers=true, socket=null, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, username='boban'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@28e388a9, statementCache=LimitedStatementCache{cache={io.r2dbc.postgresql.BoundedStatementCache$CacheKey@9418b150=S_0, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@589270a3=S_1, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@f6d8b11a=S_2, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@de50be46=S_3}, counter=4, client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, limit=5000}}, sql='UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;', generatedColumns=null}
2025-06-30 05:23:18.924 [INFO ] [com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Campaign exists with id Optional.empty MDC: {}
2025-06-30 05:23:18.925 [INFO ] `enter code here`[com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Successfully updated campaign 0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {}
Подробнее здесь: [url]https://stackoverflow.com/questions/79684756/inconsistent-update-behavior-with-r2dbc-and-postgresql-returns-successfully-upd[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия