Kafka Connect не передает заголовки в Converter#fromConnectDataJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Kafka Connect не передает заголовки в Converter#fromConnectData

Сообщение Anonymous »

TL;др; как реализовать конвертер Kafka, использующий заголовки?

(при использовании Confluent Replicator)
Я создал собственный конвертер Kafka Connect и, насколько я понимаю, , toConnectData используется при десериализации сообщений.
В интерфейсе есть 2 функции, вторая включает заголовки и упоминает, что это функция, которая будет вызываться системой Connect. , а первый существует для обратной совместимости.
Два интерфейса:

Код: Выделить всё

    byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
Ссылка на интерфейс:
https://github.com/apache/kafka/blob/1e ... che/kafka/ Connect/storage/Converter.java#L52-L68
На самом деле я нахожу первый, который будет использоваться вместо этого, и для моего варианта использования мне нужны заголовки для выполнения этой функции.
Пример реализации конвертера

Код: Выделить всё

package com.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

public class ExampleConverter implements Converter {

...

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("headers not supplied, these are required in order to decrypt");
}

@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}

}
Я запускаю этот конвертер, используя образ контейнера Confluent's Connect confluentinc/cp-enterprise-replicator:7.7.0
Я получаю следующее ошибка — которая ясно указывает на то, что она вызывает старую (устаревшую?) функцию без заголовков:

Код: Выделить всё

java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Прошу совета. Я делаю что-то не так?

Подробнее здесь: https://stackoverflow.com/questions/791 ... onnectdata
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»