Резюме: Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных.
**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку сообщения)
2) Rest api (почтальон) < /p>
3)Приложение для создания Java (с использованием Ide)
4)Процесс Boomi
**
Проблема:**
Каждый раз, когда я создаю записи вручную (в кнопке создания сообщения в слитном облаке), они отправляются в тему и соединитель приемника mssql обработает запись и отправит ее в базу данных → Этот случай успешен
Но если я отправлю с помощью третьих сторон например, Boomi, Postman, Java-приложение ==> Записи отправляются в тему, но соединитель приемника mssql отправляет записи в DLQ, они не отправляются в базу данных.
Это ошибка, которую я заметил в сообщении DLQ.
Это схема, которую я добавил для этой темы
[ { "key": "__connect.errors.topic", "value": "sample_data_test" }, { "key": "__connect.errors.partition", " value": "5" }, { "key": "__connect.errors.offset", "value": "12" }, { "key": "__connect.errors.connector.name", "value": " lcc-vk28vj" }, { "key": "__connect.errors.task.id", "value": "0" }, { "key": "__connect.errors.stage", "value": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "value": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.Exception.class.name" , "value": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.Exception.message", "value": "Преобразование byte[] в данные Kafka Connect не удалось из-за к ошибке сериализации темы sample_data_test: " }, { "key": "__connect.errors.Exception.stacktrace", "value": "org.apache.kafka.connect.errors.DataException: Преобразование byte[] в Данные Kafka Connect не удалось выполнить из-за ошибки сериализации темы sample_data_test: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect. runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka .connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache .kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka .connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect .runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime .WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java. util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java. util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java. lang.Thread.run(Thread.java:1583)\nВызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения JSON для идентификатора -1\n\tat io.confluent.kafka.serializers.json. AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer. deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... еще 17\nВызвано: org.apache.kafka.common .errors.SerializationException: неизвестный магический байт!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize( АннотацияKafkaJsonSchemaДесериализатор .java:129)\n\t... еще 20\n" } ]
Итак, в приложении производителя Java то, что у меня есть «Готово» — перед отправкой сообщения в тему. Я также выполнил сериализацию.
**1) Есть ли какие-либо ограничения для mssql соединитель приемника (при отправке записей через третьих лиц)?
2)Есть ли другой подход?**
(Всякий раз, когда я отправляю записи в Kafka тема через процесс Boomi с использованием соединителя Kafka, Restapi или приложения производителя Java) Коннектор приемника Mssql должен обработать запись и отправить в базу данных.
Примечание. При использовании описанных выше подходов данные отправляются в тему, но приемник mssql не может их обработать. записи.
[b]Резюме:[/b] Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных. ** Подходы я пытался отправлять записи в тему.** 1) Ручной подход (создать кнопку сообщения) 2) Rest api (почтальон) < /p> 3)Приложение для создания Java (с использованием Ide) 4)Процесс Boomi ** Проблема:** [list] [*]Каждый раз, когда я создаю записи вручную (в кнопке создания сообщения в слитном облаке), они отправляются в тему и соединитель приемника mssql обработает запись и отправит ее в базу данных → Этот случай успешен
[*]Но если я отправлю с помощью третьих сторон например, Boomi, Postman, Java-приложение ==> Записи отправляются в тему, но соединитель приемника mssql отправляет записи в DLQ, они не отправляются в базу данных. Это ошибка, которую я заметил в сообщении DLQ. Это схема, которую я добавил для этой темы [ { "key": "__connect.errors.topic", "value": "sample_data_test" }, { "key": "__connect.errors.partition", " value": "5" }, { "key": "__connect.errors.offset", "value": "12" }, { "key": "__connect.errors.connector.name", "value": " lcc-vk28vj" }, { "key": "__connect.errors.task.id", "value": "0" }, { "key": "__connect.errors.stage", "value": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "value": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.Exception.class.name" , "value": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.Exception.message", "value": "Преобразование byte[] в данные Kafka Connect не удалось из-за к ошибке сериализации темы sample_data_test: " }, { "key": "__connect.errors.Exception.stacktrace", "value": "org.apache.kafka.connect.errors.DataException: [b]Преобразование byte[] в Данные Kafka Connect не удалось выполнить из-за ошибки сериализации темы sample_data_test:[/b] \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect. runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka .connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache .kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka .connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect .runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime .WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java. util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java. util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java. lang.Thread.run(Thread.java:1583)\nВызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения JSON для идентификатора -1\n\tat io.confluent.kafka.serializers.json. AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer. deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... еще 17\nВызвано: org.apache.kafka.common .errors.SerializationException: неизвестный магический байт!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize( АннотацияKafkaJsonSchemaДесериализатор .java:129)\n\t... еще 20\n" } ]
[*]Итак, в приложении производителя Java то, что у меня есть «Готово» — перед отправкой сообщения в тему. Я также выполнил сериализацию.
[/list] **1) Есть ли какие-либо ограничения для mssql соединитель приемника (при отправке записей через третьих лиц)? 2)Есть ли другой подход?** (Всякий раз, когда я отправляю записи в Kafka тема через процесс Boomi с использованием соединителя Kafka, Restapi или приложения производителя Java) Коннектор приемника Mssql должен обработать запись и отправить в базу данных. Примечание. При использовании описанных выше подходов данные отправляются в тему, но приемник mssql не может их обработать. записи.
Резюме: Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных.
**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку...
Резюме: Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных.
**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку...
Я новичок в kafka.
Мне нужно загрузить данные json из темы kafka в таблицу postgres.
Ниже приведена структура таблицы
CREATE TABLE dup_emp (
emp_id integer PRIMARY KEY,
emp_name text,
emp_salary integer
);
У меня есть кластер MSK, использующий Apache Kafka версии 3.7.x
Я пытаюсь создать MSK Sink Connector, соединители в MSK Connect поддерживают только Apache Kafka Connect версии 2.7.1 p>
Я использую автономный плагин JDBC Connector, загруженный с...
Я выполняю транзакцию обновления в базе данных Oracle, после завершения обновления в соответствующей теме должно быть опубликовано слитное сообщение Kafka.
Теперь я хотел бы найти это опубликованное сообщение, используя сообщение KEY или VALUE. Как...