public class StringToObjectIdV2 implements Transformation{
public static final String CODEC_SETTING = "codec.setting";
CodecRegistry codecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
);
@Override
public R apply(R record) {
if (record.value() instanceof Map) {
Map valueMap = (Map) record.value();
// Assuming the _id field is present and is a String
String idString = (String) valueMap.get("_id");
try {
if (idString != null && ObjectId.isValid(idString)) {
// Convert the string to ObjectId
ObjectId objectId = new ObjectId(idString);
// Replace the String _id with the ObjectId
valueMap.put("_id", objectId);
}
}catch (Exception e) {
e.getCause().printStackTrace();
}
}
// Return the transformed record
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec",
ConfigDef.Importance.MEDIUM, "The codec to use for BSON types");
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
String codecSetting = (String) configs.get(CODEC_SETTING);
if ("customCodec".equals(codecSetting)) {
codecRegistry = CodecRegistries.fromRegistries(
CodecRegistries.fromCodecs(new ObjectIdCodecCustom())
);
}
}
public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
}
Когда соединение Kafka запускает код, я всегда получаю эту ошибку:
(com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId.
at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52)
Все зависимости включены в мой jar.
Есть предложения?
СпасибоЛука
Я разрабатываю собственный smt для Kafka Connect. Цель — преобразовать _id в строковый формат из debezium в objecId mongo. Вот мой код: [code]public class StringToObjectIdV2 implements Transformation{ public static final String CODEC_SETTING = "codec.setting"; CodecRegistry codecRegistry = CodecRegistries.fromRegistries( MongoClientSettings.getDefaultCodecRegistry(), CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build()) );
@Override public R apply(R record) { if (record.value() instanceof Map) { Map valueMap = (Map) record.value(); // Assuming the _id field is present and is a String String idString = (String) valueMap.get("_id"); try { if (idString != null && ObjectId.isValid(idString)) { // Convert the string to ObjectId ObjectId objectId = new ObjectId(idString); // Replace the String _id with the ObjectId valueMap.put("_id", objectId); } }catch (Exception e) { e.getCause().printStackTrace(); }
} // Return the transformed record return record; }
@Override public ConfigDef config() { return new ConfigDef() .define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec", ConfigDef.Importance.MEDIUM, "The codec to use for BSON types"); }
public CodecRegistry getCodecRegistry() { return codecRegistry; } } [/code] Когда соединение Kafka запускает код, я всегда получаю эту ошибку: [code](com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData) org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId. at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52) [/code] Все зависимости включены в мой jar. Есть предложения? СпасибоЛука
Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект...
Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект...
согласно исходному коду configDef он должен принимать объект List .java#L97
конфигурация SMT определяется как:enter code here
transforms : EqualityCheck ,
transforms.EqualityCheck.type : org.apache.kafka.connect...
Когда для свойства ManagementClass.Path установлено новое значение, ManagementClass будет отключен от любого ранее связанного класса WMI. . Переподключитесь к новому пути классов WMI.
У меня есть приложение со стеком angular и nx.
Проблема в том, что когда я в адресной строке ввожу URL, состоящий из нескольких частей, например localhost:4200/ app/user или localhost:4200/restore/user-id , появится ошибка:
Refused to apply style...