Непоследовательное поведение обновления с R2DBC и PostgresQL: возвращает успешно обновленную кампанию 0197BF72B3-92EF-EBApache

Ответить
Anonymous
 Непоследовательное поведение обновления с R2DBC и PostgresQL: возвращает успешно обновленную кампанию 0197BF72B3-92EF-EB

Сообщение Anonymous »

фон < /p>
Я новичок в Apache Pekko и R2DBC, работая над реактивным приложением с использованием Pekko-persistence-r2dbc с базой данных PostgreSQL, размещенной на AWS RDS). Мое заявление обрабатывает события через проекции Pekko, чтобы обновить таблицу public.campaigns, но я сталкиваюсь с прерывистой проблемой, в которой выбранное запрос возвращает intaintal.empty, а последующий запрос об обновлении влияет на 0 строк, даже если подтверждается записи кампании. Событие запускает запрос на обновление (Update public.campaigns Установить сообщение_SENT_COUNT = Message_sent_Count + 1, total_cost = total_cost + $ 1, где ID = 2 долл. США;). Время от времени это обновление затрагивает 0 строк, и кампания журналов Show существует с id optional.empty, несмотря на кампанию (ID: 0197BF4A-1AB7-72B3-92EF-EBE128BDB15E), вставленную ~ 500–900 мс ранее. This happens in a low-load environment (single message, no concurrency) and doesn’t occur every time—sometimes the update works correctly.
Environment
Database: PostgreSQL on AWS RDS (unwaveringmedia_platform_prod)
Driver: R2DBC (r2dbc-postgresql)
Framework: Apache Pekko с pekko-persistence-r2dbc < /p>
Пул соединений < /p>
pekko.persistence.r2dbc {
connection-factory {
initial-size = 100
max-size = 100
connect-timeout = 5 seconds
acquire-timeout = 10 seconds
acquire-retry = 1
max-idle-time = 10 minutes
}
query {
behind-current-time = 5000 millis
refresh-interval = 20s
}
}
< /code>
code
public class CampaignTableStatisticsUpdaterProjectionHandler extends R2dbcHandler {
@Override
public CompletionStage process(R2dbcSession session, EventEnvelope eventEventEnvelope) throws Exception {
if (eventEventEnvelope.event().getTimestamp().isBefore(ServiceUtils.BEFORE_INCIDENT)) {
return CompletableFuture.completedFuture(Done.done());
}
log.debug("Processing event: {}, sequence: {}", eventEventEnvelope.event(), eventEventEnvelope.sequenceNr());
if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageSent evt) {
var selectStatement = session
.createStatement("SET TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT id FROM public.campaigns WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.selectOne(selectStatement, row -> row.get("id"))
.thenCompose(row -> {
log.info("Campaign exists with id {}", row);
if (row == null) {
return CompletableFuture.completedFuture(Done.done());
}
var updateStatement = session
.createStatement("UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;")
.bind(0, evt.getMessageCost())
.bind(1, evt.getCampaignId());
return session.updateOne(updateStatement)
.thenApply(rowsUpdated -> {
log.info("Successfully updated campaign {}, rows affected: {}", evt.getCampaignId(), rowsUpdated);
return Done.done();
});
});
} else if (eventEventEnvelope.event() instanceof OutboundCampaignMessageEntity.MessageCreated evt) {
var statement = session
.createStatement("UPDATE public.campaigns SET message_created_count = message_created_count + 1 WHERE id = $1;")
.bind(0, evt.getCampaignId());
return session.updateOne(statement).thenApply(rowsUpdated -> Done.done());
}
return CompletableFuture.completedFuture(Done.done());
}
}
< /code>
Logs
Executing update: PostgresqlStatement{bindings=[Binding{parameters=[Parameter{format=FORMAT_BINARY, type=701, value=MonoSupplier}, Parameter{format=FORMAT_TEXT, type=1043, value=MonoSupplier}]}], context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@714d4d6}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=PT5S, errorResponseLogLevel=DEBUG, database='unwaveringmedia_platform_prod', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda/0x00007f0cdea42348@234a08ea, forceBinary='true', host='unwavering-media-marketing-platform-db.cl248qc4ol90.us-east-1.rds.amazonaws.com', lockWaitTimeout='null, loopResources='null', noticeLogLevel='DEBUG', options='{}', password='**********************', port=5432, preferAttachedBuffers=true, socket=null, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, username='boban'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@28e388a9, statementCache=LimitedStatementCache{cache={io.r2dbc.postgresql.BoundedStatementCache$CacheKey@9418b150=S_0, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@589270a3=S_1, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@f6d8b11a=S_2, io.r2dbc.postgresql.BoundedStatementCache$CacheKey@de50be46=S_3}, counter=4, client=io.r2dbc.postgresql.client.ReactorNettyClient@38bf4137, limit=5000}}, sql='UPDATE public.campaigns SET message_sent_count = message_sent_count + 1, total_cost = total_cost + $1 WHERE id = $2;', generatedColumns=null}
2025-06-30 05:23:18.924 [INFO ] [com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Campaign exists with id Optional.empty MDC: {}
2025-06-30 05:23:18.925 [INFO ] `enter code here`[com.unwaveringmedia.platform.message.projection.updater.campaign.CampaignTableStatisticsUpdaterProjectionHandler] [] [ForkJoinPool.commonPool-worker-5] - Successfully updated campaign 0197bf4a-1ab7-72b3-92ef-ebe128bdb15e rows affected 0: MDC: {}


Подробнее здесь: https://stackoverflow.com/questions/796 ... sfully-upd
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Apache»