согласно исходному коду configDef он должен принимать объект List https://github.com/a0x8o/kafka/blob/mas ... /ConfigDef .java#L97
конфигурация SMT определяется как:enter code here
"transforms": "EqualityCheck",
"transforms.EqualityCheck.type":"org.apache.kafka.connect .transforms.EqualityCheckOnFields$Value",
"transforms.EqualityCheck.fields.notEquality": "field1, field2",
"transforms.EqualityCheck.fieldValues.notEquality": [[9, 0, null, 4.8234, 9.9999],
["pol", "POC", "", null ]],
И это работает, когда я тестирую его на своем локальном компьютере, но когда я на самом деле развертываю конфигурацию соединителя, мне выдается ошибка ниже:
{"error_code":500,"message":"Невозможно десериализовать значение типа java.lang.String из значения массива (токен JsonToken.START_ARRAY)\n в [Источник: УДАЛЕНО (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION отключен); строка: 35, столбец: 52] (через цепочку ссылок: java.util.LinkedHashMap["transforms.EqualityCheck.fields.notEquality"])"
Класс EqualityCheckOnFields:
public class EqualityCheckOnFields implements Transformation {
private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....
@Override
public void configure(Map props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));
...
private List> parsed = new ArrayList();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}
Подробнее здесь: https://stackoverflow.com/questions/792 ... e-transfor
Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)? ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectId
Anonymous » » в форуме JAVA - 0 Ответы
- 19 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как получить конфигурацию OpenId Connect из URL-адреса ./well-known/openid-connect в OWIN?
Anonymous » » в форуме C# - 0 Ответы
- 27 Просмотры
-
Последнее сообщение Anonymous
-