Kafka Connect не поддерживает специальный разъем SMTJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Kafka Connect не поддерживает специальный разъем SMT

Сообщение Anonymous »

Я запускаю Connect.standalone.bat локально и использую MongoDB Sink Connector. Записи в теме имеют поля даты, хранящиеся как Timestamp или ZonedDateTime, но в любом случае mongo сохраняет их как длинные или строковые, а не как собственный объект даты. Мне удалось их правильно преобразовать с помощью TimestampConverter$Value, но он не работает с вложенными полями — «tomatoes.lastUpdated» не работает. Я решил создать собственный конвертер, создал jar и добавил его в свой путь к соединителям, но когда я отправляю обновленный файл Properties.json на сервер подключения с помощью этой строки:
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
запрос является постоянным режимом отправки запроса... до истечения срока действия запроса. Я попробовал добавить
org.example.DateTransformer$Value согласно документации, но выдает ошибку, что такого класса не существует.
Я не знаю, чего мне не хватает.
public class DateTransformer implements Transformation {
private static final Logger log = LoggerFactory.getLogger(DateTransformer.class);

@Override
public void configure(Map configs) {
}

@Override
public R apply(R connectRecord) {
Object value = connectRecord.value();

if (value instanceof Struct) {
Struct structValue = (Struct) value;
if (structValue.schema().field("tomatoes") != null) {
Struct tomatoes = structValue.getStruct("tomatoes");
if (tomatoes != null && tomatoes.schema().field("lastUpdated") != null) {
String lastUpdated = tomatoes.getString("lastUpdated");
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
Date date = simpleDateFormat.parse(lastUpdated);
tomatoes.put("lastUpdated", date);
} catch (ParseException e) {
log.error("Failed to parse date: {}", lastUpdated, e);
}
} else {
log.warn("'tomatoes' struct is null. Skipping transformation.");
}
} else {
log.warn("Field 'tomatoes' not found in schema. Skipping transformation.");
}
} else {
log.warn("Record value is not an instance of Struct. Skipping transformation.");
}
return connectRecord;
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

Запись Kafka:
{
"plot": "......",
"genres": [
"Comedy"
],
"runtime": 71,
"cast": [
"Lois Wilson"
],
"title": "Movie Title",
"fullPlot": null,
"countries": [
"USA"
],
"awards": {
"wins": 1,
"nominations": 0,
"text": "1 win."
},
"year": 1921,
"type": "movie",
"tomatoes": {
"rating": 2.5,
"lastUpdated": 1430593428000
},
"released": -1520035200000,
"recordCreatedOn":2024-11-11T08:33:02.543+00:00,
"recordLastUpdatedOn":2024-11-11T08:33:02.543+00:00,
}

pom.xml:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

org.example
smt
1.0-SNAPSHOT
jar


17
17





org.apache.kafka
connect-api
3.8.0
provided


org.apache.kafka
connect-transforms
3.8.0



com.fasterxml.jackson.core
jackson-databind
2.13.1



org.slf4j
slf4j-api
2.0.16







org.apache.maven.plugins
maven-shade-plugin
3.2.1


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA












properties.json
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "movies",
"connection.uri": "mongodb://localhost:27017/",
"database": "movies",
"collection": "movies",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "AddRecordCreatedOn,FormatRecordCreatedOn,DateFormatTransformation",
"transforms.AddRecordCreatedOn.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddRecordCreatedOn.timestamp.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.FormatRecordCreatedOn.field": "recordCreatedOn",
"transforms.FormatRecordCreatedOn.target.type": "Timestamp",
"transforms.FormatRecordCreatedOn.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.DateFormatTransformation.type": "org.example.DateTransformer"
}
}

connect-standalone.properties
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=D:/kafka/connect/connect.offsets
offset.flush.interval.ms=1000

plugin.path=D:/kafka/connectors

Это журналы после запуска .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties и передачи соединителей через REST. Логи будут точно такими же, если я сделаю это так:
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-mongodb-source.properties .\config\connect-mongodb-sink.properties.
[2024-11-19 17:19:21,602] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.AbstractConnectCli:120)
[2024-11-19 17:19:21,626] INFO Loading plugin from: D:\kafka\connectors\mongo-kafka-connect-1.13.0-confluent.jar (org.apache.kafka.connect.runtime.isolation.PluginScanner:75)
[2024-11-19 17:19:21,814] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/kafka/connectors/mongo-kafka-connect-1.13.0-confluent.jar} (org.apache.kafka.connect.runtime.isolation.PluginScanner:80)
[2024-11-19 17:19:21,816] INFO Loading plugin from: D:\kafka\connectors\smt-1.0-SNAPSHOT-jar-with-dependencies.jar (org.apache.kafka.connect.runtime.isolation.PluginScanner:75)
[2024-11-19 17:19:21,863] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/kafka/connectors/smt-1.0-SNAPSHOT-jar-with-dependencies.jar} (org.apache.kafka.connect.runtime.isolation.PluginScanner:80)
[2024-11-19 17:19:21,863] INFO Loading plugin from: classpath (org.apache.kafka.connect.runtime.isolation.PluginScanner:75)
[2024-11-19 17:19:21,876] INFO Registered loader: jdk.internal.loader.ClassLoaders$AppClassLoader@5acf9800 (org.apache.kafka.connect.runtime.isolation.PluginScanner:80)
[2024-11-19 17:19:21,877] INFO Scanning plugins with ServiceLoaderScanner took 254 ms (org.apache.kafka.connect.runtime.isolation.PluginScanner:70)
[2024-11-19 17:19:21,878] INFO Loading plugin from: D:\kafka\connectors\mongo-kafka-connect-1.13.0-confluent.jar (org.apache.kafka.connect.runtime.isolation.PluginScanner:75)
[2024-11-19 17:19:22,149] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/kafka/connectors/mongo-kafka-connect-1.13.0-confluent.jar}
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
auto.include.jmx.reporter = true
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = All
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = [http://:8083]
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 1000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = D:/kafka/connect/connect.offsets
plugin.discovery = hybrid_warn
plugin.path = [D:/kafka/connectors]
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:372)
[2024-11-19 17:19:23,451] INFO Creating Kafka admin client (org.apache.kafka.connect.runtime.WorkerConfig:283)
[2024-11-19 17:19:23,456] INFO AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.controllers = []
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
enable.metrics.push = true
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
(org.apache.kafka.clients.admin.AdminClientConfig:372)
[2024-11-19 17:19:23,574] INFO These configurations '[offset.flush.interval.ms, key.converter.schemas.enable, offset.storage.file.filename, value.converter.schemas.enable, plugin.path, value.converter, key.converter]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig:381)
[2024-11-19 17:19:23,576] INFO Kafka version: 3.8.0 (org.apache.kafka.common.utils.AppInfoParser:124)
[2024-11-19 17:19:23,577] INFO Kafka commitId: 771b9576b00ecf5b (org.apache.kafka.common.utils.AppInfoParser:125)
[2024-11-19 17:19:23,577] INFO Kafka startTimeMs: 1732029563575 (org.apache.kafka.common.utils.AppInfoParser:126)
[2024-11-19 17:19:23,814] INFO Kafka cluster ID: daqwrCtFQPyzbeevhFwA1A (org.apache.kafka.connect.runtime.WorkerConfig:300)
[2024-11-19 17:19:23,816] INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2024-11-19 17:19:23,822] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2024-11-19 17:19:23,823] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2024-11-19 17:19:23,824] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)
[2024-11-19 17:19:23,830] INFO PublicConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
listeners = [http://:8083]
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
topic.tracking.allow.reset = true
topic.tracking.enable = true
(org.apache.kafka.connect.runtime.rest.RestServerConfig$PublicConfig:372)
[2024-11-19 17:19:23,841] INFO Logging initialized @2708ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:170)
[2024-11-19 17:19:23,873] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:121)
[2024-11-19 17:19:23,874] INFO Initializing REST server (org.apache.kafka.connect.runtime.rest.RestServer:192)
[2024-11-19 17:19:23,900] INFO jetty-9.4.54.v20240208; built: 2024-02-08T19:42:39.027Z; git: cef3fbd6d736a21e7d541a5db490381d95a2047d; jvm 21.0.2+13-LTS-58 (org.eclipse.jetty.server.Server:375)
[2024-11-19 17:19:23,935] INFO Started http_8083@192b472d{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:333)
[2024-11-19 17:19:23,936] INFO Started @2803ms (org.eclipse.jetty.server.Server:415)
[2024-11-19 17:19:23,953] INFO Advertised URI: http://192.168.1.3:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:412)
[2024-11-19 17:19:23,954] INFO REST server listening at http://192.168.1.3:8083/, advertising URL http://192.168.1.3:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:212)
[2024-11-19 17:19:23,955] INFO Advertised URI: http://192.168.1.3:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:412)
[2024-11-19 17:19:23,955] INFO REST admin endpoints at http://192.168.1.3:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:215)
[2024-11-19 17:19:23,956] INFO Advertised URI: http://192.168.1.3:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:412)
[2024-11-19 17:19:23,957] INFO Setting up All Policy for ConnectorClientConfigOverride. This will allow all client configurations to be overridden (org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy:44)
[2024-11-19 17:19:23,958] INFO JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
(org.apache.kafka.connect.json.JsonConverterConfig:372)
[2024-11-19 17:19:23,967] INFO Kafka version: 3.8.0 (org.apache.kafka.common.utils.AppInfoParser:124)
[2024-11-19 17:19:23,967] INFO Kafka commitId: 771b9576b00ecf5b (org.apache.kafka.common.utils.AppInfoParser:125)
[2024-11-19 17:19:23,969] INFO Kafka startTimeMs: 1732029563967 (org.apache.kafka.common.utils.AppInfoParser:126)
writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=null, streamFactoryFactory=null, commandListeners=[], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@71d5f582]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[com.mongodb.kafka.connect.util.ConnectionValidator$1@71c78d91]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=UNSPECIFIED, serverApi=null, autoEncryptionSettings=null, contextProvider=null} (org.mongodb.driver.client:71)
[2024-11-19 17:19:26,837] INFO Opened connection [connectionId{localValue:1, serverValue:899}] to localhost:27017 (org.mongodb.driver.connection:71)
[2024-11-19 17:19:26,837] INFO Opened connection [connectionId{localValue:2, serverValue:898}] to localhost:27017 (org.mongodb.driver.connection:71)
[2024-11-19 17:19:26,838] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=25, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=27288100} (org.mongodb.driver.cluster:71)
[2024-11-19 17:19:26,857] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:372)
[2024-11-19 17:19:26,876] INFO [mongodb-sink|worker] Creating connector mongodb-sink of type com.mongodb.kafka.connect.MongoSinkConnector (org.apache.kafka.connect.runtime.Worker:312)
[2024-11-19 17:19:26,878] INFO [mongodb-sink|worker] SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.mongodb.kafka.connect.MongoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mongodb-sink
predicates = []
tasks.max = 1
tasks.max.enforce = true
topics = [movies]
topics.regex =
transforms = [AddRecordCreatedOn, AddRecordLastUpdatedOn, FormatRecordCreatedOn, FormatRecordLastUpdatedOn, FormatReleasedToTimestamp, FormatLastUpdatedToTimestamp, CustomSmt]
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig:372)
[2024-11-19 17:19:26,882] INFO [mongodb-sink|worker] EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.mongodb.kafka.connect.MongoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mongodb-sink
predicates = []
tasks.max = 1
tasks.max.enforce = true
topics = [movies]
topics.regex =
transforms = [AddRecordCreatedOn, AddRecordLastUpdatedOn, FormatRecordCreatedOn, FormatRecordLastUpdatedOn, FormatReleasedToTimestamp, FormatLastUpdatedToTimestamp, CustomSmt]
transforms.AddRecordCreatedOn.negate = false
transforms.AddRecordCreatedOn.offset.field = null
transforms.AddRecordCreatedOn.partition.field = null
transforms.AddRecordCreatedOn.predicate = null
transforms.AddRecordCreatedOn.static.field = null
transforms.AddRecordCreatedOn.static.value = null
transforms.AddRecordCreatedOn.timestamp.field = recordCreatedOn
transforms.AddRecordCreatedOn.topic.field = null
transforms.AddRecordCreatedOn.type = class org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddRecordLastUpdatedOn.negate = false
transforms.AddRecordLastUpdatedOn.offset.field = null
transforms.AddRecordLastUpdatedOn.partition.field = null
transforms.AddRecordLastUpdatedOn.predicate = null
transforms.AddRecordLastUpdatedOn.static.field = null
transforms.AddRecordLastUpdatedOn.static.value = null
transforms.AddRecordLastUpdatedOn.timestamp.field = recordLastUpdatedOn
transforms.AddRecordLastUpdatedOn.topic.field = null
transforms.AddRecordLastUpdatedOn.type = class org.apache.kafka.connect.transforms.InsertField$Value
transforms.CustomSmt.negate = false
transforms.CustomSmt.predicate = null
transforms.CustomSmt.type = class org.example.DateTransformer
transforms.FormatLastUpdatedToTimestamp.field = released
transforms.FormatLastUpdatedToTimestamp.format = yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
transforms.FormatLastUpdatedToTimestamp.negate = false
transforms.FormatLastUpdatedToTimestamp.predicate = null
transforms.FormatLastUpdatedToTimestamp.target.type = Timestamp
transforms.FormatLastUpdatedToTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.FormatLastUpdatedToTimestamp.unix.precision = milliseconds
transforms.FormatRecordCreatedOn.field = recordCreatedOn
transforms.FormatRecordCreatedOn.format = yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
transforms.FormatRecordCreatedOn.negate = false
transforms.FormatRecordCreatedOn.predicate = null
transforms.FormatRecordCreatedOn.target.type = Timestamp
transforms.FormatRecordCreatedOn.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.FormatRecordCreatedOn.unix.precision = milliseconds
transforms.FormatRecordLastUpdatedOn.field = recordLastUpdatedOn
transforms.FormatRecordLastUpdatedOn.format = yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
transforms.FormatRecordLastUpdatedOn.negate = false
transforms.FormatRecordLastUpdatedOn.predicate = null
transforms.FormatRecordLastUpdatedOn.target.type = Timestamp
transforms.FormatRecordLastUpdatedOn.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.FormatRecordLastUpdatedOn.unix.precision = milliseconds
transforms.FormatReleasedToTimestamp.field = lastUpdated
transforms.FormatReleasedToTimestamp.format = yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
transforms.FormatReleasedToTimestamp.negate = false
transforms.FormatReleasedToTimestamp.predicate = null
transforms.FormatReleasedToTimestamp.target.type = Timestamp
transforms.FormatReleasedToTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.FormatReleasedToTimestamp.unix.precision = milliseconds
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:372)
[2024-11-19 17:19:26,887] INFO [mongodb-sink|worker] Instantiated connector mongodb-sink with version 1.13.0 of type class com.mongodb.kafka.connect.MongoSinkConnector (org.apache.kafka.connect.runtime.Worker:334)
[2024-11-19 17:19:26,889] INFO [mongodb-sink|worker] Finished creating connector mongodb-sink (org.apache.kafka.connect.runtime.Worker:355)
[2024-11-19 17:19:26,894] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.mongodb.kafka.connect.MongoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = mongodb-sink
predicates = []
tasks.max = 1
tasks.max.enforce = true
topics = [movies]
topics.regex =
transforms = [AddRecordCreatedOn, AddRecordLastUpdatedOn, FormatRecordCreatedOn, FormatRecordLastUpdatedOn, FormatReleasedToTimestamp, FormatLastUpdatedToTimestamp, CustomSmt]
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig:372)


Подробнее здесь: https://stackoverflow.com/questions/791 ... -connector
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Kafka Connect не поддерживает специальный разъем SMT
    Anonymous » » в форуме JAVA
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Как передать конфигурацию списка списков для SMT Kafka Connect (преобразование одного сообщения)?
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Kafka connect не загружать пользовательский разъем
    Anonymous » » в форуме JAVA
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Пользовательский SMT Kafka соединяет mongodb trasfom _id из строки в objectId
    Anonymous » » в форуме JAVA
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Как добавить таблицы в разъем MySQL Source Kafka Debezium
    Anonymous » » в форуме MySql
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»