(при использовании Confluent Replicator)
Я создал собственный конвертер Kafka Connect и, насколько я понимаю, , toConnectData используется при десериализации сообщений.
В интерфейсе есть 2 функции, вторая включает заголовки и упоминает, что это функция, которая будет вызываться системой Connect. , а первый существует для обратной совместимости.
Два интерфейса:
Код: Выделить всё
byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
https://github.com/apache/kafka/blob/1e ... che/kafka/ Connect/storage/Converter.java#L52-L68
На самом деле я нахожу первый, который будет использоваться вместо этого, и для моего варианта использования мне нужны заголовки для выполнения этой функции.
Пример реализации конвертера
Код: Выделить всё
package com.example;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
public class ExampleConverter implements Converter {
...
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("headers not supplied, these are required in order to decrypt");
}
@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}
}
Я получаю следующее ошибка — которая ясно указывает на то, что она вызывает старую (устаревшую?) функцию без заголовков:
Код: Выделить всё
java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Подробнее здесь: https://stackoverflow.com/questions/791 ... onnectdata
Мобильная версия