Мне нужно загрузить данные json из темы kafka в таблицу postgres.
Ниже приведена структура таблицы
Код: Выделить всё
CREATE TABLE dup_emp (
emp_id integer PRIMARY KEY,
emp_name text,
emp_salary integer
);
Пример, {"emp_id":1,"emp_name":"bheem", "emp_salary":2000
И моя конфигурация соединителя следующая: -
Код: Выделить всё
curl -X PUT http://localhost:8083/connectors/load_test/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/somedb",
"connection.user":"user",
"connection.password":"passwd",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"tasks.max" : "1",
"topics":"dup_emp",
"table.name.format":"dup_emp",
"insert.mode":"insert",
"quote.sql.identifiers":"never"
}'
Вызвано: org.apache.kafka.connect.errors.ConnectException: Соединитель приемника «load_test» настроен с использованием «delete.enabled=false» и «pk.mode=none» и поэтому требует записей с ненулевым значением структуры и ненулевой схемой структуры, но найдена запись по адресу (topic='dup_emp ',partition=0,offset=0,timestamp=1633066307312) со значением HashMap и схемой нулевых значений.
Насколько мне известно, когда мы используем исходный соединитель, это схема будет храниться в реестре схем и использоваться соединителем приемника для проверки. Но теперь, когда я не использую исходный соединитель и вручную создаю данные в формате json, как я могу загрузить данные в postgres.
Может кто-нибудь помочь мне понять суть исправить ошибку и предложить мне правильную конфигурацию для загрузки данных в postgres.
Подробнее здесь: https://stackoverflow.com/questions/694 ... -connector