my Application.yml file
Код: Выделить всё
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file:
role-arn:
role-session-name: RoleSessionName
region:
static:
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
Ниже приведен класс Java, который содержат бобы для обработки данных из Kinesis < /p>
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
@Configuration
public class KinesisConfig {
private final DataSource dataSource;
@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}
@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}
@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
Согласно моему предыдущему вопросу, который я задавал ниже ссылки < /p>
Можно ли использовать PostgreSQL вместо динамо Dynamo по умолчанию DB для контрольной точки и блокировка в случае потребления данных из кинезиса с использованием подхода Binder < /li>
Проверка и блокировку в Amazon Kinesis с использованием Postgresql < /li>
< /ol>
Я сделал то же решение, предоставленное и его работа для меня. Я смог использовать PostgreSQL для контрольной точки и блокировки. p>
Существует две Pod, использующие одну и ту же кодовую базу, которая потребляет сообщение от Kinesis "test-test.tst.v1" Таблица < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
анализ < /strong> < /p>
ошибка: дубликат значения ключа нарушает уникальное ограничение "int_lock_pk"
detail: key (lock_key , область) = (2B062295-539B-38EB-A544-A6DD9214E50B, по умолчанию) уже существует. p>
Файл журнала генерируется в каждой мине с этой ошибкой. < /p>
шаги для воспроизведения < /strong> < /p>
Я заменил Dynamo DB на постгресту DB для блокировки стручков и контрольной точки. Переопределить конфигурацию по умолчанию (Dynamo DB, контрольная точка, метадатастор) с классом Kinesisconfig выше < /p>
< /li>
Я добавил таблицу блокировки и проверки, используя ниже Скрипт < /p>
Создать таблицу int_message (
message_id char (36) не null,
region varchar (100) не null,
create_date timeStam br /> message_bytes bytea,
ограничение int_message_pk Первичный ключ (Message_id, Region)
); < /p>
Создать индекс int_message_ix1 на int_message (create_date); < /p> < /p> Создать таблицу int_group_to_message (
group_key char (36) не null,
message_id char (36) не null,
region varchar (100),
constraint int_group_to_message_pk Первичный ключ (Group_key, Message_id, Region)
); < /p>
Создание таблицы int_message_group (
Group_key char (36) не null,
grain varchar (100 ) Не null,
group_condition varchar (255),
complete bigint,
last_relead_ sequence bigint,
create_date timeStam Ограничение int_message_group_pk Первичный ключ (Group_key, Region)
); < /p>
Создание таблицы int_lock (
lock_key char (36) не null,
region varchar (100 ) Не null,
client_id char (36),
create_date timeStamp не null,
ограничение int_lock_pk Первичный ключ (lock_key, region)
); < /p>
Создать последовательность int_message_seq Начните с 1 приращения на 1 без цикла; < /p>
Создать таблицу int_channel_message (
message_id char (36) не null,
Group_key char (36 ) Не null,
create_date bigint not null,
message_priority bigint,
message_ sequence bigint not null default nextval ('int_message_seq'),
message_bytes bytea,
region varhar ( 100) Не null,
constraint int_channel_message_pk Первичный ключ (область, group_key, create_date, message_sectence)
); < /p>
Создание индекса int_channel_msg_delete_idx на int_channe_messag ); < /p>
Создать таблицу int_metadata_store (
metadata_key varchar (255) Не nul ,
ограничение int_metadata_store_pk Первичный ключ (metadata_key, region)
); < /p>
< /li>
< /ol>
Желаемое поведение < /strong>
Это не должно создавать дубликацию значения ключа, нарушает уникальные ограничения при блокировании POD с помощью зависимости-spring-cloud-d-d-binder-kinesis jar. < /P>
Может ли кто -нибудь, пожалуйста, помочь мне решить эту проблему.
Подробнее здесь: https://stackoverflow.com/questions/794 ... sing-postg