Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectIdJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectId

Сообщение Anonymous »

Я разрабатываю собственный smt для Kafka Connect.
Цель — преобразовать _id в строковый формат из debezium в objecId mongo.
Вот мой код:

Код: Выделить всё

public class StringToObjectIdV2  implements Transformation{
public static final String CODEC_SETTING = "codec.setting";
CodecRegistry codecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
);

@Override
public R apply(R record) {
if (record.value() instanceof Map) {
Map valueMap = (Map) record.value();
// Assuming the _id field is present and is a String
String idString = (String) valueMap.get("_id");
try {
if (idString != null && ObjectId.isValid(idString)) {
// Convert the string to ObjectId
ObjectId objectId = new ObjectId(idString);
// Replace the String _id with the ObjectId
valueMap.put("_id", objectId);
}
}catch (Exception e) {
e.getCause().printStackTrace();
}

}
// Return the transformed record
return record;
}

@Override
public ConfigDef config() {
return new ConfigDef()
.define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec",
ConfigDef.Importance.MEDIUM, "The codec to use for BSON types");
}

@Override
public void close() {
}

@Override
public void configure(Map configs) {
String codecSetting = (String) configs.get(CODEC_SETTING);

if ("customCodec".equals(codecSetting)) {
codecRegistry = CodecRegistries.fromRegistries(
CodecRegistries.fromCodecs(new ObjectIdCodecCustom())
);
}
}

public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
}
Когда соединение Kafka запускает код, я всегда получаю эту ошибку:

Код: Выделить всё

(com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId.
at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52)
Все зависимости включены в мой jar.
Есть предложения?
СпасибоЛука

Подробнее здесь: https://stackoverflow.com/questions/791 ... o-objectid
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Kafka Connect не поддерживает специальный разъем SMT
    Anonymous » » в форуме JAVA
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Kafka Connect не поддерживает специальный разъем SMT
    Anonymous » » в форуме JAVA
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous
  • Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Изменение свойства ManagementClass.Path не соединяет объект с новым классом WMI.
    Anonymous » » в форуме C#
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Angular соединяет стили, используя несуществующий URL-адрес
    Anonymous » » в форуме CSS
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»