Невозможно отправить записи в БД из темы Kafka с помощью соединителя приемника сервера Microsoft sql в слитной Kafka.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно отправить записи в БД из темы Kafka с помощью соединителя приемника сервера Microsoft sql в слитной Kafka.

Сообщение Anonymous »

Резюме: Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных.
**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку сообщения)
2) Rest api (почтальон) < /p>
3)Приложение для создания Java (с использованием Ide)
4)Процесс Boomi
**
Проблема:**
  • Каждый раз, когда я создаю записи вручную (в кнопке создания сообщения в слитном облаке), они отправляются в тему и соединитель приемника mssql обработает запись и отправит ее в базу данных → Этот случай успешен
  • Но если я отправлю с помощью третьих сторон например, Boomi, Postman, Java-приложение ==> Записи отправляются в тему, но соединитель приемника mssql отправляет записи в DLQ, они не отправляются в базу данных.
    Это ошибка, которую я заметил в сообщении DLQ.
    Это схема, которую я добавил для этой темы
    [ { "key": "__connect.errors.topic", "value": "sample_data_test" }, { "key": "__connect.errors.partition", " value": "5" }, { "key": "__connect.errors.offset", "value": "12" }, { "key": "__connect.errors.connector.name", "value": " lcc-vk28vj" }, { "key": "__connect.errors.task.id", "value": "0" }, { "key": "__connect.errors.stage", "value": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "value": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.Exception.class.name" , "value": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.Exception.message", "value": "Преобразование byte[] в данные Kafka Connect не удалось из-за к ошибке сериализации темы sample_data_test: " }, { "key": "__connect.errors.Exception.stacktrace", "value": "org.apache.kafka.connect.errors.DataException: Преобразование byte[] в Данные Kafka Connect не удалось выполнить из-за ошибки сериализации темы sample_data_test: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect. runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka .connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache .kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka .connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect .runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime .WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java. util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java. util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java. lang.Thread.run(Thread.java:1583)\nВызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения JSON для идентификатора -1\n\tat io.confluent.kafka.serializers.json. AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer. deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... еще 17\nВызвано: org.apache.kafka.common .errors.SerializationException: неизвестный магический байт!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize( АннотацияKafkaJsonSchemaДесериализатор .java:129)\n\t... еще 20\n" } ]
  • Итак, в приложении производителя Java то, что у меня есть «Готово» — перед отправкой сообщения в тему. Я также выполнил сериализацию.
**1) Есть ли какие-либо ограничения для mssql соединитель приемника (при отправке записей через третьих лиц)?
2)Есть ли другой подход?**
(Всякий раз, когда я отправляю записи в Kafka тема через процесс Boomi с использованием соединителя Kafka, Restapi или приложения производителя Java) Коннектор приемника Mssql должен обработать запись и отправить в базу данных.
Примечание. При использовании описанных выше подходов данные отправляются в тему, но приемник mssql не может их обработать. записи.

Подробнее здесь: https://stackoverflow.com/questions/789 ... er-sink-co
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»