Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectIdJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectId

Сообщение Anonymous »

Я разрабатываю собственный smt для Kafka Connect.
Цель — преобразовать _id в строковый формат из debezium в objecId mongo.
Вот мой код:

Код: Выделить всё

public class StringToObjectIdV2  implements Transformation{
public static final String CODEC_SETTING = "codec.setting";
CodecRegistry codecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
);

@Override
public R apply(R record) {
if (record.value() instanceof Map) {
Map valueMap = (Map) record.value();
// Assuming the _id field is present and is a String
String idString = (String) valueMap.get("_id");
try {
if (idString != null && ObjectId.isValid(idString)) {
// Convert the string to ObjectId
ObjectId objectId = new ObjectId(idString);
// Replace the String _id with the ObjectId
valueMap.put("_id", objectId);
}
}catch (Exception e) {
e.getCause().printStackTrace();
}

}
// Return the transformed record
return record;
}

@Override
public ConfigDef config() {
return new ConfigDef()
.define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec",
ConfigDef.Importance.MEDIUM, "The codec to use for BSON types");
}

@Override
public void close() {
}

@Override
public void configure(Map configs) {
String codecSetting = (String) configs.get(CODEC_SETTING);

if ("customCodec".equals(codecSetting)) {
codecRegistry = CodecRegistries.fromRegistries(
CodecRegistries.fromCodecs(new ObjectIdCodecCustom())
);
}
}

public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
}
Когда соединение Kafka запускает код, я всегда получаю эту ошибку:

Код: Выделить всё

(com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId.
at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52)
Все зависимости включены в мой jar.
Есть предложения?
СпасибоЛука

Подробнее здесь: https://stackoverflow.com/questions/791 ... o-objectid
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»