согласно исходному коду configDef он должен принимать объект List https://github.com/a0x8o/kafka/blob/mas ... /ConfigDef .java#L97
конфигурация SMT определяется как:enter code here
"transforms": "EqualityCheck",
"transforms.EqualityCheck.type":"org.apache.kafka.connect .transforms.EqualityCheckOnFields$Value",
"transforms.EqualityCheck.fields.notEquality": "field1, field2",
"transforms.EqualityCheck.fieldValues.notEquality": [[9, 0, null, 4.8234, 9.9999],
["pol", "POC", "", null ]],
И это работает, когда я тестирую его на своем локальном компьютере, но когда я на самом деле развертываю конфигурацию соединителя, мне выдается ошибка ниже:
{"error_code":500,"message":"Невозможно десериализовать значение типа java.lang.String из значения массива (токен JsonToken.START_ARRAY)\n в [Источник: УДАЛЕНО (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION отключен); строка: 35, столбец: 52] (через цепочку ссылок: java.util.LinkedHashMap["transforms.EqualityCheck.fields.notEquality"])"
Класс EqualityCheckOnFields:
public class EqualityCheckOnFields implements Transformation {
private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....
@Override
public void configure(Map props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));
...
private List> parsed = new ArrayList();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}
Подробнее здесь: https://stackoverflow.com/questions/792 ... e-transfor
Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)? ⇐ JAVA
Программисты JAVA общаются здесь
1733597344
Anonymous
согласно исходному коду configDef он должен принимать объект List https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/config/ConfigDef .java#L97
конфигурация SMT определяется как:enter code here
"transforms": "EqualityCheck",
"transforms.EqualityCheck.type":"org.apache.kafka.connect .transforms.EqualityCheckOnFields$Value",
"transforms.EqualityCheck.fields.notEquality": "field1, field2",
"transforms.EqualityCheck.fieldValues.notEquality": [[9, 0, null, 4.8234, 9.9999],
["pol", "POC", "", null ]],
И это работает, когда я тестирую его на своем локальном компьютере, но когда я на самом деле развертываю конфигурацию соединителя, мне выдается ошибка ниже:
{"error_code":500,"message":"Невозможно десериализовать значение типа java.lang.String из значения массива (токен JsonToken.START_ARRAY)\n в [Источник: УДАЛЕНО (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION отключен); строка: 35, столбец: 52] (через цепочку ссылок: java.util.LinkedHashMap["transforms.EqualityCheck.fields.notEquality"])"
Класс EqualityCheckOnFields:
public class EqualityCheckOnFields implements Transformation {
private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....
@Override
public void configure(Map props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));
...
private List> parsed = new ArrayList();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79261182/how-to-pass-list-of-list-config-for-kafka-connects-smt-single-message-transfor[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия