Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?

Сообщение Anonymous »

согласно исходному коду configDef он должен принимать объект List https://github.com/a0x8o/kafka/blob/mas ... /ConfigDef .java#L97
конфигурация SMT определяется как:enter code here
"transforms": "EqualityCheck",
"transforms.EqualityCheck.type":"org.apache.kafka.connect .transforms.EqualityCheckOnFields$Value",
"transforms.EqualityCheck.fields.notEquality": "field1, field2",
"transforms.EqualityCheck.fieldValues.notEquality": [[9, 0, null, 4.8234, 9.9999],
["pol", "POC", "", null ]],
И это работает, когда я тестирую его на своем локальном компьютере, но когда я на самом деле развертываю конфигурацию соединителя, мне выдается ошибка ниже:
{"error_code":500,"message":"Невозможно десериализовать значение типа java.lang.String из значения массива (токен JsonToken.START_ARRAY)\n в [Источник: УДАЛЕНО (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION отключен); строка: 35, столбец: 52] (через цепочку ссылок: java.util.LinkedHashMap["transforms.EqualityCheck.fields.notEquality"])"
Класс EqualityCheckOnFields:
public class EqualityCheckOnFields implements Transformation {

private static final Logger LOG = LoggerFactory.getLogger(EqualityCheckOnFields.class);
private static final String PURPOSE = "field validation";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("fields.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of fields for not-equality check")
.define("fieldValues.notEquality", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "List of lists containing invalid values
....

@Override
public void configure(Map props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

notEqualityFields = config.getList("fields.notEquality");
notEqualityValues = parseValues(config.getList("fieldValues.notEquality"));

...
private List> parsed = new ArrayList();
for (Object value : values) {
if (value instanceof String) {
// Parse String as JSON array
try {
parsed.add(new ObjectMapper(`enter code here`).readValue((String) value, List.class));
} catch (Exception e) {
throw new ConfigException("Invalid JSON format in list values: " + value, e);
}
} else if (value instanceof List) {
// If already a List, cast directly
parsed.add((List) value);
} else {
throw new ConfigException("Unexpected value type in list: " + value.getClass().getName());
}
}
return parsed;
}


Подробнее здесь: https://stackoverflow.com/questions/792 ... e-transfor
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Kafka Connect не поддерживает специальный разъем SMT
    Anonymous » » в форуме JAVA
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Kafka Connect не поддерживает специальный разъем SMT
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectId
    Anonymous » » в форуме JAVA
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Как получить конфигурацию OpenId Connect из URL-адреса ./well-known/openid-connect в OWIN?
    Anonymous » » в форуме C#
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Kafka-реактор для повторного чтения того же сообщения Kafka, если обработка сообщения не удалась
    Anonymous » » в форуме JAVA
    0 Ответы
    109 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»