Невозможно отправить записи в БД из темы Kafka с помощью соединителя приемника сервера Microsoft sql.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно отправить записи в БД из темы Kafka с помощью соединителя приемника сервера Microsoft sql.

Сообщение Anonymous »

Резюме: Мне нужно отправить записи в тему Kafka, и с помощью соединителя приемника сервера Microsoft sql нужно получить запись из темы и отправить в базу данных.
**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку сообщения)
2) Rest api (почтальон) < /p>
3)Приложение для создания Java (с использованием Ide)
4)Процесс Boomi
**
Проблема:**
  • Каждый раз, когда я создаю записи вручную (в кнопке создания сообщения в слитном облаке), они отправляются в тему и соединитель приемника mssql обработает запись и отправит ее в базу данных → Этот случай успешен
  • Но если я отправлю с помощью третьих сторон например, Boomi, Postman, Java-приложение ==> Записи отправляются в тему, но соединитель приемника mssql отправляет записи в DLQ, они не отправляются в базу данных.
    Это ошибка, которую я заметил в сообщении DLQ.
    Это схема, которую я добавил для этой темы

Код: Выделить всё

[   {     "key": "__connect.errors.topic",     "value": "sample_data_test"   },   {     "key": "__connect.errors.partition",     "value": "5"   },   {     "key": "__connect.errors.offset",     "value": "12"   },   {     "key": "__connect.errors.connector.name",     "value": "lcc-vk28vj"   },   {     "key": "__connect.errors.task.id",     "value": "0"   },   {     "key": "__connect.errors.stage",     "value": "VALUE_CONVERTER"   },   {     "key": "__connect.errors.class.name",     "value": "io.confluent.connect.json.JsonSchemaConverter"   },   {     "key": "__connect.errors.exception.class.name",     "value": "org.apache.kafka.connect.errors.DataException"   },   {     "key": "__connect.errors.exception.message",     "value": "Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: "   },   {     "key": "__connect.errors.exception.stacktrace",     "value": "org.apache.kafka.connect.errors.DataException: **Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test:** \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t...  17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)\n\t... 20 more\n"   } ]
[*]Итак, в приложении-производителе Java я сделал перед отправкой сообщения в тему также сериализацию.

**1)Есть ли какие-либо ограничения для соединителя приемника mssql (при отправке записей через сторонние организации)?
2)Существует ли другой подход?* *
(Всякий раз, когда я отправляю записи в тему Kafka через процесс Boomi с использованием соединителя Kafka, Restapi или приложения-производителя Java) Коннектор приемника Mssql должен обработать запись и отправить ее в базу данных.
Примечание. При использовании описанных выше подходов данные отправляются в тему, но приемник mssql не может обработать эти записи.

Подробнее здесь: https://stackoverflow.com/questions/789 ... er-sink-co
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»