Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?

Сообщение Anonymous »

согласно исходному коду configDef он должен принимать объект List https://github.com/a0x8o/kafka/blob/mas ... /ConfigDef .java#L97
конфигурация SMT определяется как:enter code here
"transforms": "EqualityCheck",
"transforms.EqualityCheck.type":"org.apache.kafka.connect .transforms.EqualityCheckOnFields$Value",
"transforms.EqualityCheck.fields.notEquality": "field1, field2",
"transforms.EqualityCheck.fieldValues.notEquality": [[9, 0, null, 4.8234, 9.9999],
["pol", "POC", "", null ]],
И это работает, когда я тестирую его на своем локальном компьютере, но когда я на самом деле развертываю конфигурацию соединителя, мне выдается ошибка ниже:
{"error_code":500,"message":"Невозможно десериализовать значение типа java.lang.String из значения массива (токен JsonToken.START_ARRAY)\n в [Источник: УДАЛЕНО (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION отключен); строка: 35, столбец: 52] (через цепочку ссылок: java.util.LinkedHashMap["transforms.EqualityCheck.fields.notEquality"])"
Класс EqualityCheckOnFields:
public class EqualityCheckOnFields implements Transformation {

private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....

@Override
public void configure(Map props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));

...
private List> parsed = new ArrayList();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}


Подробнее здесь: https://stackoverflow.com/questions/792 ... e-transfor
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»