**
Подходы я пытался отправлять записи в тему.**
1) Ручной подход (создать кнопку сообщения)
2) Rest api (почтальон) < /p>
3)Приложение для создания Java (с использованием Ide)
4)Процесс Boomi
**
Проблема:**
- Каждый раз, когда я создаю записи вручную (в кнопке создания сообщения в слитном облаке), они отправляются в тему и соединитель приемника mssql обработает запись и отправит ее в базу данных → Этот случай успешен
- Но если я отправлю с помощью третьих сторон например, Boomi, Postman, Java-приложение ==> Записи отправляются в тему, но соединитель приемника mssql отправляет записи в DLQ, они не отправляются в базу данных.
Это ошибка, которую я заметил в сообщении DLQ.
Это схема, которую я добавил для этой темы
Код: Выделить всё
[ { "key": "__connect.errors.topic", "value": "sample_data_test" }, { "key": "__connect.errors.partition", "value": "5" }, { "key": "__connect.errors.offset", "value": "12" }, { "key": "__connect.errors.connector.name", "value": "lcc-vk28vj" }, { "key": "__connect.errors.task.id", "value": "0" }, { "key": "__connect.errors.stage", "value": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "value": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "value": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "value": "Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: " }, { "key": "__connect.errors.exception.stacktrace", "value": "org.apache.kafka.connect.errors.DataException: **Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test:** \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)\n\t... 20 more\n" } ]
**1)Есть ли какие-либо ограничения для соединителя приемника mssql (при отправке записей через сторонние организации)?
2)Существует ли другой подход?* *
(Всякий раз, когда я отправляю записи в тему Kafka через процесс Boomi с использованием соединителя Kafka, Restapi или приложения-производителя Java) Коннектор приемника Mssql должен обработать запись и отправить ее в базу данных.
Примечание. При использовании описанных выше подходов данные отправляются в тему, но приемник mssql не может обработать эти записи.
Подробнее здесь: https://stackoverflow.com/questions/789 ... er-sink-co