Я использую org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2 Зависимость в реализации моего проекта.
my Application.yml file < /p>
Код: Выделить всё
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file:
role-arn:
role-session-name: RoleSessionName
region:
static:
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
< /code>
Ниже приведен класс Java, который содержат бобы для обработки данных из Kinesis < /p>
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
@Configuration
public class KinesisConfig {
private final DataSource dataSource;
@Autowired
KinesisConfig(@Qualifier("dataSource") DataSource dataSource){ this.dataSource = dataSource;}
@Bean
public LockRepository lockRepository(){
DefaultLockRepository lockRepository = new DefaultLockRepository(dataSource);
return lockRepository;
}
@Bean
public LockRegistry lockRegistry(LockRepository lockRepository){return new JdbcLockRegistry(lockRepository);}
@Bean
public ConcurrentMetadataStore metadataStore(){
JdbcMetadataStore metadataStore = new JdbcMetadataStore(dataSource);
return metadataStore;
}
}
< /code>
Согласно моему предыдущему вопросу, который я задал ниже ссылки < /p>
Можно ли использовать PostgreSQL вместо динамо Dynamo Demonac и блокировка в случае потребления данных из кинезиса с использованием подхода Binder < /li>
Проверка и блокировку в Amazon Kinesis с использованием Postgresql < /li>
< /ul>
Я сделал то же решение, предоставленное, и это работает для меня. Я смог использовать PostgreSQL для цели контрольной точки и блокировки. Ошибка. < /p>
Существует два стручка, в которых используется та же кодовая база, которая потребляет сообщения от Kinesis "test-test.tst.v1" < /p>
Ниже приведен Создать запрос для таблицы блокировки < /p>
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
< /code>
анализ < /h3>
Ошибка: дубликат значения клавиши нарушает уникальное ограничение "int_lock_pk"
detail : Key (lock_key, region) = (2b062295-539b-38eb-a544-a6dd9214e50b, по умолчанию) уже существует. $ 2, $ 3, $ 4) < /p>
< /blockquote>
Файл журнала генерируется каждую минуту с этой ошибкой. < /P>
шаги для воспроизведения < /h4>
Я заменил DynamoDB на дБ PostgreSQL для блокировки стручков и контрольной точки. Переопределить конфигурацию по умолчанию (Dynamo DB, контрольная точка, метадатастор) с классом Kinesisconfig выше. < /P>
< /li>
Я добавил таблицу блокировки и проверки, используя таблицу с использованием указания, используя Ниже сценарий < /p>
CREATE TABLE INT_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CREATED_DATE TIMESTAMP NOT NULL,
MESSAGE_BYTES BYTEA,
constraint INT_MESSAGE_PK primary key (MESSAGE_ID, REGION)
);
CREATE INDEX INT_MESSAGE_IX1 ON INT_MESSAGE (CREATED_DATE);
CREATE TABLE INT_GROUP_TO_MESSAGE (
GROUP_KEY CHAR(36) NOT NULL,
MESSAGE_ID CHAR(36) NOT NULL,
REGION VARCHAR(100),
constraint INT_GROUP_TO_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION)
);
CREATE TABLE INT_MESSAGE_GROUP (
GROUP_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
GROUP_CONDITION VARCHAR(255),
COMPLETE BIGINT,
LAST_RELEASED_SEQUENCE BIGINT,
CREATED_DATE TIMESTAMP NOT NULL,
UPDATED_DATE TIMESTAMP DEFAULT NULL,
constraint INT_MESSAGE_GROUP_PK primary key (GROUP_KEY, REGION)
);
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE;
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'),
MESSAGE_BYTES BYTEA,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
CREATE INDEX INT_CHANNEL_MSG_DELETE_IDX ON INT_CHANNEL_MESSAGE (REGION, GROUP_KEY, MESSAGE_ID);
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
< /code>
< /li>
< /ol>
желаемое поведение < /h4>
не должно создавать дубликат ключа, нарушает уникальные ограничения Заблокировав стручок с помощью зависимости-скрещивания-копленговой банки-кинезис.
Как я могу решить эту проблему?
Подробнее здесь: https://stackoverflow.com/questions/794 ... locking-in