Kafka Connect не поддерживает специальный разъем SMTJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Kafka Connect не поддерживает специальный разъем SMT

Сообщение Anonymous »

Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект даты. Мне удалось их правильно преобразовать с помощью TimestampConverter$Value, но он не работает с вложенными полями — «tomatoes.lastUpdated» не работает. Я решил создать собственный конвертер, создал jar и добавил его в свой путь к соединителям, но когда я отправляю обновленный файл Properties.json на сервер подключения с помощью этой строки:
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
запрос является постоянным режимом отправки запроса... до истечения срока действия запроса. Я попробовал добавить
org.example.DateTransformer$Value согласно документации, но выдает ошибку, что такого класса не существует.
Я не знаю, чего мне не хватает.
public class DateTransformer implements Transformation {
private static final Logger log = LoggerFactory.getLogger(DateTransformer.class);

@Override
public void configure(Map configs) {
}

@Override
public R apply(R connectRecord) {
Object value = connectRecord.value();

if (value instanceof Struct) {
Struct structValue = (Struct) value;
if (structValue.schema().field("tomatoes") != null) {
Struct tomatoes = structValue.getStruct("tomatoes");
if (tomatoes != null && tomatoes.schema().field("lastUpdated") != null) {
String lastUpdated = tomatoes.getString("lastUpdated");
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
Date date = simpleDateFormat.parse(lastUpdated);
tomatoes.put("lastUpdated", date);
} catch (ParseException e) {
log.error("Failed to parse date: {}", lastUpdated, e);
}
} else {
log.warn("'tomatoes' struct is null. Skipping transformation.");
}
} else {
log.warn("Field 'tomatoes' not found in schema. Skipping transformation.");
}
} else {
log.warn("Record value is not an instance of Struct. Skipping transformation.");
}
return connectRecord;
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

Запись Kafka:
{
"plot": "......",
"genres": [
"Comedy"
],
"runtime": 71,
"cast": [
"Lois Wilson"
],
"title": "Movie Title",
"fullPlot": null,
"countries": [
"USA"
],
"awards": {
"wins": 1,
"nominations": 0,
"text": "1 win."
},
"year": 1921,
"type": "movie",
"tomatoes": {
"rating": 2.5,
"lastUpdated": 1430593428000
},
"released": -1520035200000,
"recordCreatedOn":2024-11-11T08:33:02.543+00:00,
"recordLastUpdatedOn":2024-11-11T08:33:02.543+00:00,
}

pom.xml:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

org.example
smt
1.0-SNAPSHOT
jar


17
17





org.apache.kafka
connect-api
3.8.0
provided


org.apache.kafka
connect-transforms
3.8.0



com.fasterxml.jackson.core
jackson-databind
2.13.1



org.slf4j
slf4j-api
2.0.16







org.apache.maven.plugins
maven-shade-plugin
3.2.1


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA












properties.json
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "movies",
"connection.uri": "mongodb://localhost:27017/",
"database": "movies",
"collection": "movies",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "AddRecordCreatedOn,FormatRecordCreatedOn,DateFormatTransformation",
"transforms.AddRecordCreatedOn.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddRecordCreatedOn.timestamp.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.FormatRecordCreatedOn.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.target.type": "Timestamp",
"transforms.FormatRecordCreatedOn.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
}
}

connect-standalone.properties
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=D:/kafka/connect/connect.offsets
offset.flush.interval.ms=1000

plugin.path=D:/kafka/connectors


Подробнее здесь: https://stackoverflow.com/questions/791 ... -connector
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»