Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект даты. Мне удалось их правильно преобразовать с помощью TimestampConverter$Value, но он не работает с вложенными полями — «tomatoes.lastUpdated» не работает. Я решил создать собственный конвертер, создал jar и добавил его в свой путь к соединителям, но когда я отправляю обновленный файл Properties.json на сервер подключения с помощью этой строки:
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
запрос является постоянным режимом отправки запроса... до истечения срока действия запроса. Я попробовал добавить
org.example.DateTransformer$Value согласно документации, но выдает ошибку, что такого класса не существует.
Я не знаю, чего мне не хватает.
public class DateTransformer implements Transformation {
private static final Logger log = LoggerFactory.getLogger(DateTransformer.class);
@Override
public void configure(Map configs) {
}
@Override
public R apply(R connectRecord) {
Object value = connectRecord.value();
if (value instanceof Struct) {
Struct structValue = (Struct) value;
if (structValue.schema().field("tomatoes") != null) {
Struct tomatoes = structValue.getStruct("tomatoes");
if (tomatoes != null && tomatoes.schema().field("lastUpdated") != null) {
String lastUpdated = tomatoes.getString("lastUpdated");
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
Date date = simpleDateFormat.parse(lastUpdated);
tomatoes.put("lastUpdated", date);
} catch (ParseException e) {
log.error("Failed to parse date: {}", lastUpdated, e);
}
} else {
log.warn("'tomatoes' struct is null. Skipping transformation.");
}
} else {
log.warn("Field 'tomatoes' not found in schema. Skipping transformation.");
}
} else {
log.warn("Record value is not an instance of Struct. Skipping transformation.");
}
return connectRecord;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
Запись Kafka:
{
"plot": "......",
"genres": [
"Comedy"
],
"runtime": 71,
"cast": [
"Lois Wilson"
],
"title": "Movie Title",
"fullPlot": null,
"countries": [
"USA"
],
"awards": {
"wins": 1,
"nominations": 0,
"text": "1 win."
},
"year": 1921,
"type": "movie",
"tomatoes": {
"rating": 2.5,
"lastUpdated": 1430593428000
},
"released": -1520035200000,
"recordCreatedOn":2024-11-11T08:33:02.543+00:00,
"recordLastUpdatedOn":2024-11-11T08:33:02.543+00:00,
}
pom.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.example
smt
1.0-SNAPSHOT
jar
17
17
org.apache.kafka
connect-api
3.8.0
provided
org.apache.kafka
connect-transforms
3.8.0
com.fasterxml.jackson.core
jackson-databind
2.13.1
org.slf4j
slf4j-api
2.0.16
org.apache.maven.plugins
maven-shade-plugin
3.2.1
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
properties.json
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "movies",
"connection.uri": "mongodb://localhost:27017/",
"database": "movies",
"collection": "movies",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "AddRecordCreatedOn,FormatRecordCreatedOn,DateFormatTransformation",
"transforms.AddRecordCreatedOn.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddRecordCreatedOn.timestamp.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.FormatRecordCreatedOn.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.target.type": "Timestamp",
"transforms.FormatRecordCreatedOn.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
}
}
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=D:/kafka/connect/connect.offsets
offset.flush.interval.ms=1000
plugin.path=D:/kafka/connectors
Подробнее здесь: https://stackoverflow.com/questions/791 ... -connector
Kafka Connect не поддерживает специальный разъем SMT ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1732056972
Anonymous
Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект даты. Мне удалось их правильно преобразовать с помощью TimestampConverter$Value, но он не работает с вложенными полями — «tomatoes.lastUpdated» не работает. Я решил создать собственный конвертер, создал jar и добавил его в свой путь к соединителям, но когда я отправляю обновленный файл Properties.json на сервер подключения с помощью этой строки:
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
запрос является постоянным режимом отправки запроса... до истечения срока действия запроса. Я попробовал добавить
org.example.DateTransformer$Value согласно документации, но выдает ошибку, что такого класса не существует.
Я не знаю, чего мне не хватает.
public class DateTransformer implements Transformation {
private static final Logger log = LoggerFactory.getLogger(DateTransformer.class);
@Override
public void configure(Map configs) {
}
@Override
public R apply(R connectRecord) {
Object value = connectRecord.value();
if (value instanceof Struct) {
Struct structValue = (Struct) value;
if (structValue.schema().field("tomatoes") != null) {
Struct tomatoes = structValue.getStruct("tomatoes");
if (tomatoes != null && tomatoes.schema().field("lastUpdated") != null) {
String lastUpdated = tomatoes.getString("lastUpdated");
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
Date date = simpleDateFormat.parse(lastUpdated);
tomatoes.put("lastUpdated", date);
} catch (ParseException e) {
log.error("Failed to parse date: {}", lastUpdated, e);
}
} else {
log.warn("'tomatoes' struct is null. Skipping transformation.");
}
} else {
log.warn("Field 'tomatoes' not found in schema. Skipping transformation.");
}
} else {
log.warn("Record value is not an instance of Struct. Skipping transformation.");
}
return connectRecord;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
Запись Kafka:
{
"plot": "......",
"genres": [
"Comedy"
],
"runtime": 71,
"cast": [
"Lois Wilson"
],
"title": "Movie Title",
"fullPlot": null,
"countries": [
"USA"
],
"awards": {
"wins": 1,
"nominations": 0,
"text": "1 win."
},
"year": 1921,
"type": "movie",
"tomatoes": {
"rating": 2.5,
"lastUpdated": 1430593428000
},
"released": -1520035200000,
"recordCreatedOn":2024-11-11T08:33:02.543+00:00,
"recordLastUpdatedOn":2024-11-11T08:33:02.543+00:00,
}
pom.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.example
smt
1.0-SNAPSHOT
jar
17
17
org.apache.kafka
connect-api
3.8.0
provided
org.apache.kafka
connect-transforms
3.8.0
com.fasterxml.jackson.core
jackson-databind
2.13.1
org.slf4j
slf4j-api
2.0.16
org.apache.maven.plugins
maven-shade-plugin
3.2.1
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
properties.json
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "movies",
"connection.uri": "mongodb://localhost:27017/",
"database": "movies",
"collection": "movies",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "AddRecordCreatedOn,FormatRecordCreatedOn,DateFormatTransformation",
"transforms.AddRecordCreatedOn.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddRecordCreatedOn.timestamp.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.FormatRecordCreatedOn.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.target.type": "Timestamp",
"transforms.FormatRecordCreatedOn.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
}
}
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=D:/kafka/connect/connect.offsets
offset.flush.interval.ms=1000
plugin.path=D:/kafka/connectors
Подробнее здесь: [url]https://stackoverflow.com/questions/79176966/kafka-connect-doesnt-accept-custom-smt-connector[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия