my Application.yml file
Код: Выделить всё
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file:
role-arn:
role-session-name: RoleSessionName
region:
static:
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
Ниже приведен класс Java, который содержат бобы для обработки данных из Kinesis < /p>
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
@Configuration
public class KinesisConfig {
private final DataSource dataSource;
@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}
@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}
@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
Согласно моему предыдущему вопросу, который я задавал ниже ссылки < /p>
Можно ли использовать PostgreSQL вместо динамо Dynamo по умолчанию DB для контрольной точки и блокировка в случае потребления данных из кинезиса с использованием подхода Binder < /li>
Проверка и блокировку в Amazon Kinesis с использованием Postgresql < /li>
< /ol>
Я сделал то же решение, предоставленное и его работа для меня. Я смог использовать PostgreSQL для контрольной точки и блокировки. p>
Существует две Pod, использующие одну и ту же кодовую базу, которая потребляет сообщение от Kinesis "test-test.tst.v1" Таблица < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
ошибки < /strong> < /p>
Ошибка: дубликат значения ключа нарушает уникальное ограничение "int_lock_pk"
detail: key (lock_key , область) = (2B062295-539B-38EB-A544-A6DD9214E50B, по умолчанию) уже существует. p>
может кто -нибудь помочь мне решить эту проблему. < /p>
Подробнее здесь: https://stackoverflow.com/questions/794 ... sing-postg