Мы используем поток Spring Cloud для передачи события в Kafka, и все работало нормально, пока мы не начали вводить кэш Redis.
Я думаю, что по какой-то причине комбинация Redis и Kafka при неудачной сериализации.
На этом этапе кажется, что кеш Redis работает нормально, но выдача события в Kafka не работает и мы получаем приведенное ниже исключение Redis при создании события в Kafka.< /p>
Error while producing SampleEvent: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'sampleOutput';
nested exception is org.springframework.data.redis.serializer.SerializationException: Could not write JSON: Not an array:
{"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException:
Not an array: {"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]), failedMessage=GenericMessage [payload={"field": "value"}, headers={id=16108acf-6ca1-f1b2-2ed3-44eb857daa6b, contentType=application/*+avro, timestamp=1712741537549}]
Весенняя загрузка: 2.4.5
Весенний облачный поток: 2020.0.2
И этот репозиторий GitHub содержит воспроизводимый пример.
https://github.com/krishgokul494/redisandkafka.git
pom.xml
x s i : s c h e m a L o c a t i o n = & q u o t ; h t t p : / / m a v e n . a p a c h e . o r g / P O M / 4 . 0 . 0 h t t p : / / m a v e n . a p a c h e . o r g / x s d / m a v e n - 4 . 0 . 0 . x s d & q u o t ; & g t ; < b r / > & l t ; m o d e l V e r s i o n & g t ; 4 . 0 . 0 & l t ; / m o d e l V e r s i o n & g t ; < b r / > < b r / > & l t ; g r o u p I d & g t ; c o m . s a m p l e & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; r e d i s a n d k a f k a & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 0 . 0 . 1 - S N A P S H O T & l t ; / v e r s i o n & g t ; < b r / > & l t ; p a c k a g i n g & g t ; j a r & l t ; / p a c k a g i n g & g t ; < b r / > < b r / > & l t ; n a m e & g t ; r e d i s a n d k a f k a & l t ; / n a m e & g t ; < b r / > < b r / > & l t ; p r o p e r t i e s & g t ; < b r / > & l t ; p r o j e c t . b u i l d . s o u r c e E n c o d i n g & g t ; U T F - 8 & l t ; / p r o j e c t . b u i l d . s o u r c e E n c o d i n g & g t ; < b r / > & l t ; p r o j e c t . r e p o r t i n g . o u t p u t E n c o d i n g & g t ; U T F - 8 & l t ; / p r o j e c t . r e p o r t i n g . o u t p u t E n c o d i n g & g t ; < b r / > & l t ; j a v a . v e r s i o n & g t ; 1 . 8 & l t ; / j a v a . v e r s i o n & g t ; < b r / > & l t ; s p r i n g - c l o u d . v e r s i o n & g t ; 2 0 2 0 . 0 . 2 & l t ; / s p r i n g - c l o u d . v e r s i o n & g t ; < b r / > & l t ; s p r i n g - b o o t . v e r s i o n & g t ; 2 . 4 . 5 & l t ; / s p r i n g - b o o t . v e r s i o n & g t ; < b r / > & l t ; a v r o . v e r s i o n & g t ; 1 . 8 . 2 & l t ; / a v r o . v e r s i o n & g t ; < b r / > & l t ; c o n f l u e n t . v e r s i o n & g t ; 4 . 0 . 0 & l t ; / c o n f l u e n t . v e r s i o n & g t ; < b r / > & l t ; j a c o c o . v e r s i o n & g t ; 0 . 8 . 5 & l t ; / j a c o c o . v e r s i o n & g t ; < b r / > & l t ; m a v e n . c o m p i l e r . s o u r c e & g t ; 1 . 8 & l t ; / m a v e n . c o m p i l e r . s o u r c e & g t ; < b r / > & l t ; m a v e n . c o m p i l e r . t a r g e t & g t ; 1 . 8 & l t ; / m a v e n . c o m p i l e r . t a r g e t & g t ; < b r / > & l t ; / p r o p e r t i e s & g t ; < b r / > < b r / > & l t ; d e p e n d e n c i e s & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . c l o u d & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - c l o u d - s c h e m a - r e g i s t r y - c l i e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - a c t u a t o r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - c a c h e & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - d a t a - r e d i s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - w e b & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; e x c l u s i o n s & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . h i b e r n a t e & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; h i b e r n a t e - v a l i d a t o r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; / e x c l u s i o n s & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . p r o j e c t l o m b o k & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; l o m b o k & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; o p t i o n a l & g t ; t r u e & l t ; / o p t i o n a l & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; l o g 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; l o g 4 j & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 1 . 2 . 1 7 & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a n i n o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; j a n i n o & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a n i n o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; c o m m o n s - c o m p i l e r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a c k s o n & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; j a c k s o n - m a p p e r - a s l & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 1 . 9 . 1 3 & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; i o . c o n f l u e n t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - a v r o - s e r i a l i z e r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { c o n f l u e n t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; e x c l u s i o n s & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s l f 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s l f 4 j - a p i & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s l f 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s l f 4 j - l o g 4 j 1 2 & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; / e x c l u s i o n s & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; i o . c o n f l u e n t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - s c h e m a - r e g i s t r y - c l i e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { c o n f l u e n t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . a p a c h e . k a f k a & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - c l i e n t s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; / d e p e n d e n c i e s & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y M a n a g e m e n t & g t ; < b r / > & l t ; d e p e n d e n c i e s & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . c l o u d & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - c l o u d - d e p e n d e n c i e s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - c l o u d . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; t y p e & g t ; p o m & l t ; / t y p e & g t ; < b r / > & l t ; s c o p e & g t ; i m p o r t & l t ; / s c o p e & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - p a r e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - b o o t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; t y p e & g t ; p o m & l t ; / t y p e & g t ; < b r / > & l t ; s c o p e & g t ; i m p o r t & l t ; / s c o p e & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; / d e p e n d e n c i e s & g t ; < b r / > & l t ; / d e p e n d e n c y M a n a g e m e n t & g t ; < b r / > < b r / > & l t ; b u i l d & g t ; < b r / > & l t ; p l u g i n s & g t ; < b r / > & l t ; p l u g i n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - m a v e n - p l u g i n & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - b o o t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; c o n f i g u r a t i o n & g t ; < b r / > & l t ; m a i n C l a s s & g t ; c o m . s a m p l e . r e d i s a n d k a f k a . A p p l i c a t i o n & l t ; / m a i n C l a s s & g t ; < b r / > & l t ; / c o n f i g u r a t i o n & g t ; < b r / > & l t ; e x e c u t i o n s & g t ; < b r / > & l t ; e x e c u t i o n & g t ; < b r / > & l t ; g o a l s & g t ; < b r / > & l t ; g o a l & g t ; r e p a c k a g e & l t ; / g o a l & g t ; < b r / > & l t ; / g o a l s & g t ; < b r / > & l t ; / e x e c u t i o n & g t ; < b r / > & l t ; / e x e c u t i o n s & g t ; < b r / > & l t ; / p l u g i n & g t ; < b r / > & l t ; p l u g i n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . a p a c h e . a v r o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; a v r o - m a v e n - p l u g i n & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { a v r o . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; e x e c u t i o n s & g t ; < b r / > & l t ; e x e c u t i o n & g t ; < b r / > & l t ; p h a s e & g t ; g e n e r a t e - s o u r c e s & l t ; / p h a s e & g t ; < b r / > & l t ; g o a l s & g t ; < b r / > & l t ; g o a l & g t ; s c h e m a & l t ; / g o a l & g t ; < b r / > & l t ; g o a l & g t ; p r o t o c o l & l t ; / g o a l & g t ; < b r / > & l t ; g o a l & g t ; i d l - p r o t o c o l & l t ; / g o a l & g t ; < b r / > & l t ; / g o a l s & g t ; < b r / > & l t ; c o n f i g u r a t i o n & g t ; < b r /> ${project.basedir}/src/main/resources/avro
org.jacoco
jacoco-maven-plugin
${jacoco.version}
**/avro/**/*
default-prepare-agent
prepare-agent
default-report
prepare-package
report
kafka-binder
true
org.springframework.cloud
spring-cloud-stream-binder-kafka
repo
true
spring-libs-milestones
Spring Milestones
https://repo.spring.io/milestone
false
spring-milestones
Spring libs-Milestones
https://repo.spring.io/libs-milestone/
false
confluent
http://packages.confluent.io/maven/
false
repository.spring.release
Spring GA Repository
https://repo.spring.io/plugins-release/
SampleKafkaSourceProducer.java
package com.sample.redisandkafka.kafka;
import com.sample.redisandkafka.SampleEvent;
import com.sample.redisandkafka.config.SampleOutputSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(SampleOutputSource.class)
@Slf4j
public class SampleKafkaSourceProducer {
@Autowired
private SampleOutputSource sampleOutputSource;
public boolean produceSampleEvent() {
SampleEvent sampleEvent = SampleEvent.newBuilder().setField("value").build();
try {
sampleOutputSource.sampleMessageChannel().send(MessageBuilder.withPayload(sampleEvent).build());
} catch(Exception ex) {
log.error("Error while producing SampleEvent: {}", ex.toString());
return false;
}
return true;
}
}
KafkaConfig.java
package com.sample.redisandkafka.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.schema.registry.client.ConfluentSchemaRegistryClient;
import org.springframework.cloud.schema.registry.client.SchemaRegistryClient;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.KafkaHeaderMapper;
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
RedisCacheConfig.java
package com.sample.redisandkafka.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@EnableCaching
@Configuration
@ConditionalOnProperty(name = "spring.cache.enabled", havingValue = "true")
public class RedisCacheConfig extends CachingConfigurerSupport {
@Autowired
private CacheConfigurationProperties cacheConfigurationProperties = null;
private RedisCacheConfiguration createCacheConfiguration(long timeoutInHours) {
return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(timeoutInHours))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
}
@Bean
public CacheManager cacheManager(LettuceConnectionFactory redisConnectionFactory) {
Map cacheConfigurations = new HashMap();
if (Objects.nonNull(cacheConfigurationProperties.getCachesTTL())) {
for (Entry cacheNameAndTimeout : cacheConfigurationProperties.getCachesTTL()
.entrySet()) {
cacheConfigurations.put(cacheNameAndTimeout.getKey(),
createCacheConfiguration(cacheNameAndTimeout.getValue()));
}
}
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(createCacheConfiguration(cacheConfigurationProperties.getDefaultTTL()))
.withInitialCacheConfigurations(cacheConfigurations).build();
}
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(
cacheConfigurationProperties.getHost(), cacheConfigurationProperties.getPort());
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Configuration
@ConfigurationProperties(prefix = "spring.cache")
@Data
class CacheConfigurationProperties {
private String host;
private int port;
private Long defaultTTL;
private Map cachesTTL;
}
}
SampleInputSink.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface SampleInputSink {
String SAMPLE = "sampleInput";
@Input(SAMPLE)
MessageChannel sampleMessageChannel();
}
SampleOutputSource.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SampleOutputSource {
String SAMPLE = "sampleOutput";
@Output(SAMPLE)
MessageChannel sampleMessageChannel();
}
application.yml
spring:
application:
name: redisandkafka
main:
allow-bean-definition-overriding: true
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:9081
kafka:
binder:
brokers: PLAINTEXT_HOST://localhost:30092
min-partition-count: 1
replication-factor: 1
useNativeDecoding: true
bindings:
sampleInput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
configuration:
schema.registry.url: http://localhost:9081
specific.avro.reader: true
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max:
poll:
records: 100
interval.ms: 900000
bindings:
sampleInput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event
group: sample.local.sample_event
sampleOutput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event_output
group: sample.local.sample_event
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.StringSerializer
value:
serde: io.confluent.kafka.serializers.KafkaAvroSerializer
security.basic.enable: false
management.security.enabled: false
security.ignored: /**
Sample.asvc
{
"namespace": "com.sample.redisandkafka",
"type": "record",
"name": "SampleEvent",
"fields": [
{"name": "field", "type": "string", "default": "null"}
]
}
Обновление 1:
Используя тот же пример приложения, мы удалили зависимости, связанные с Redis [приведены ниже], и удалили класс RedisCacheConfig class и сделал приложение kafka самостоятельно, и оно работает хорошо
pom.xml
org.springframework.boot
spring-boot-starter-data-redis
application.yml
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
Подробнее здесь: https://stackoverflow.com/questions/783 ... tion-error
Spring загрузка Redis и ошибка сериализации Kafka ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1713253602
Anonymous
Мы используем поток Spring Cloud для передачи события в Kafka, и все работало нормально, пока мы не начали вводить кэш Redis.
Я думаю, что по какой-то причине комбинация Redis и Kafka при неудачной сериализации.
На этом этапе кажется, что кеш Redis работает нормально, но выдача события в Kafka не работает и мы получаем приведенное ниже исключение Redis при создании события в Kafka.< /p>
Error while producing SampleEvent: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'sampleOutput';
nested exception is org.springframework.data.redis.serializer.SerializationException: Could not write JSON: Not an array:
{"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException:
Not an array: {"type":"record","name":"SampleEvent","namespace":"com.sample.redisandkafka","fields":[{"name":"field","type":"string","default":"null"}]} (through reference chain: org.springframework.cloud.schema.registry.ParsedSchema["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]), failedMessage=GenericMessage [payload={"field": "value"}, headers={id=16108acf-6ca1-f1b2-2ed3-44eb857daa6b, contentType=application/*+avro, timestamp=1712741537549}]
Весенняя загрузка: 2.4.5
Весенний облачный поток: 2020.0.2
И этот репозиторий GitHub содержит воспроизводимый пример.
https://github.com/krishgokul494/redisandkafka.git
pom.xml
x s i : s c h e m a L o c a t i o n = & q u o t ; h t t p : / / m a v e n . a p a c h e . o r g / P O M / 4 . 0 . 0 h t t p : / / m a v e n . a p a c h e . o r g / x s d / m a v e n - 4 . 0 . 0 . x s d & q u o t ; & g t ; < b r / > & l t ; m o d e l V e r s i o n & g t ; 4 . 0 . 0 & l t ; / m o d e l V e r s i o n & g t ; < b r / > < b r / > & l t ; g r o u p I d & g t ; c o m . s a m p l e & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; r e d i s a n d k a f k a & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 0 . 0 . 1 - S N A P S H O T & l t ; / v e r s i o n & g t ; < b r / > & l t ; p a c k a g i n g & g t ; j a r & l t ; / p a c k a g i n g & g t ; < b r / > < b r / > & l t ; n a m e & g t ; r e d i s a n d k a f k a & l t ; / n a m e & g t ; < b r / > < b r / > & l t ; p r o p e r t i e s & g t ; < b r / > & l t ; p r o j e c t . b u i l d . s o u r c e E n c o d i n g & g t ; U T F - 8 & l t ; / p r o j e c t . b u i l d . s o u r c e E n c o d i n g & g t ; < b r / > & l t ; p r o j e c t . r e p o r t i n g . o u t p u t E n c o d i n g & g t ; U T F - 8 & l t ; / p r o j e c t . r e p o r t i n g . o u t p u t E n c o d i n g & g t ; < b r / > & l t ; j a v a . v e r s i o n & g t ; 1 . 8 & l t ; / j a v a . v e r s i o n & g t ; < b r / > & l t ; s p r i n g - c l o u d . v e r s i o n & g t ; 2 0 2 0 . 0 . 2 & l t ; / s p r i n g - c l o u d . v e r s i o n & g t ; < b r / > & l t ; s p r i n g - b o o t . v e r s i o n & g t ; 2 . 4 . 5 & l t ; / s p r i n g - b o o t . v e r s i o n & g t ; < b r / > & l t ; a v r o . v e r s i o n & g t ; 1 . 8 . 2 & l t ; / a v r o . v e r s i o n & g t ; < b r / > & l t ; c o n f l u e n t . v e r s i o n & g t ; 4 . 0 . 0 & l t ; / c o n f l u e n t . v e r s i o n & g t ; < b r / > & l t ; j a c o c o . v e r s i o n & g t ; 0 . 8 . 5 & l t ; / j a c o c o . v e r s i o n & g t ; < b r / > & l t ; m a v e n . c o m p i l e r . s o u r c e & g t ; 1 . 8 & l t ; / m a v e n . c o m p i l e r . s o u r c e & g t ; < b r / > & l t ; m a v e n . c o m p i l e r . t a r g e t & g t ; 1 . 8 & l t ; / m a v e n . c o m p i l e r . t a r g e t & g t ; < b r / > & l t ; / p r o p e r t i e s & g t ; < b r / > < b r / > & l t ; d e p e n d e n c i e s & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . c l o u d & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - c l o u d - s c h e m a - r e g i s t r y - c l i e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - a c t u a t o r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - c a c h e & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - d a t a - r e d i s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - w e b & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; e x c l u s i o n s & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . h i b e r n a t e & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; h i b e r n a t e - v a l i d a t o r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; / e x c l u s i o n s & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . p r o j e c t l o m b o k & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; l o m b o k & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; o p t i o n a l & g t ; t r u e & l t ; / o p t i o n a l & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; l o g 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; l o g 4 j & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 1 . 2 . 1 7 & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a n i n o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; j a n i n o & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a n i n o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; c o m m o n s - c o m p i l e r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . c o d e h a u s . j a c k s o n & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; j a c k s o n - m a p p e r - a s l & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; 1 . 9 . 1 3 & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; i o . c o n f l u e n t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - a v r o - s e r i a l i z e r & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { c o n f l u e n t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; e x c l u s i o n s & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s l f 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s l f 4 j - a p i & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; e x c l u s i o n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s l f 4 j & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s l f 4 j - l o g 4 j 1 2 & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / e x c l u s i o n & g t ; < b r / > & l t ; / e x c l u s i o n s & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; i o . c o n f l u e n t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - s c h e m a - r e g i s t r y - c l i e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { c o n f l u e n t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . a p a c h e . k a f k a & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; k a f k a - c l i e n t s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; / d e p e n d e n c i e s & g t ; < b r / > < b r / > & l t ; d e p e n d e n c y M a n a g e m e n t & g t ; < b r / > & l t ; d e p e n d e n c i e s & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . c l o u d & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - c l o u d - d e p e n d e n c i e s & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - c l o u d . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; t y p e & g t ; p o m & l t ; / t y p e & g t ; < b r / > & l t ; s c o p e & g t ; i m p o r t & l t ; / s c o p e & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; d e p e n d e n c y & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - s t a r t e r - p a r e n t & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - b o o t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; t y p e & g t ; p o m & l t ; / t y p e & g t ; < b r / > & l t ; s c o p e & g t ; i m p o r t & l t ; / s c o p e & g t ; < b r / > & l t ; / d e p e n d e n c y & g t ; < b r / > & l t ; / d e p e n d e n c i e s & g t ; < b r / > & l t ; / d e p e n d e n c y M a n a g e m e n t & g t ; < b r / > < b r / > & l t ; b u i l d & g t ; < b r / > & l t ; p l u g i n s & g t ; < b r / > & l t ; p l u g i n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . s p r i n g f r a m e w o r k . b o o t & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; s p r i n g - b o o t - m a v e n - p l u g i n & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { s p r i n g - b o o t . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; c o n f i g u r a t i o n & g t ; < b r / > & l t ; m a i n C l a s s & g t ; c o m . s a m p l e . r e d i s a n d k a f k a . A p p l i c a t i o n & l t ; / m a i n C l a s s & g t ; < b r / > & l t ; / c o n f i g u r a t i o n & g t ; < b r / > & l t ; e x e c u t i o n s & g t ; < b r / > & l t ; e x e c u t i o n & g t ; < b r / > & l t ; g o a l s & g t ; < b r / > & l t ; g o a l & g t ; r e p a c k a g e & l t ; / g o a l & g t ; < b r / > & l t ; / g o a l s & g t ; < b r / > & l t ; / e x e c u t i o n & g t ; < b r / > & l t ; / e x e c u t i o n s & g t ; < b r / > & l t ; / p l u g i n & g t ; < b r / > & l t ; p l u g i n & g t ; < b r / > & l t ; g r o u p I d & g t ; o r g . a p a c h e . a v r o & l t ; / g r o u p I d & g t ; < b r / > & l t ; a r t i f a c t I d & g t ; a v r o - m a v e n - p l u g i n & l t ; / a r t i f a c t I d & g t ; < b r / > & l t ; v e r s i o n & g t ; $ { a v r o . v e r s i o n } & l t ; / v e r s i o n & g t ; < b r / > & l t ; e x e c u t i o n s & g t ; < b r / > & l t ; e x e c u t i o n & g t ; < b r / > & l t ; p h a s e & g t ; g e n e r a t e - s o u r c e s & l t ; / p h a s e & g t ; < b r / > & l t ; g o a l s & g t ; < b r / > & l t ; g o a l & g t ; s c h e m a & l t ; / g o a l & g t ; < b r / > & l t ; g o a l & g t ; p r o t o c o l & l t ; / g o a l & g t ; < b r / > & l t ; g o a l & g t ; i d l - p r o t o c o l & l t ; / g o a l & g t ; < b r / > & l t ; / g o a l s & g t ; < b r / > & l t ; c o n f i g u r a t i o n & g t ; < b r /> ${project.basedir}/src/main/resources/avro
org.jacoco
jacoco-maven-plugin
${jacoco.version}
**/avro/**/*
default-prepare-agent
prepare-agent
default-report
prepare-package
report
kafka-binder
true
org.springframework.cloud
spring-cloud-stream-binder-kafka
repo
true
spring-libs-milestones
Spring Milestones
https://repo.spring.io/milestone
false
spring-milestones
Spring libs-Milestones
https://repo.spring.io/libs-milestone/
false
confluent
http://packages.confluent.io/maven/
false
repository.spring.release
Spring GA Repository
https://repo.spring.io/plugins-release/
SampleKafkaSourceProducer.java
package com.sample.redisandkafka.kafka;
import com.sample.redisandkafka.SampleEvent;
import com.sample.redisandkafka.config.SampleOutputSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(SampleOutputSource.class)
@Slf4j
public class SampleKafkaSourceProducer {
@Autowired
private SampleOutputSource sampleOutputSource;
public boolean produceSampleEvent() {
SampleEvent sampleEvent = SampleEvent.newBuilder().setField("value").build();
try {
sampleOutputSource.sampleMessageChannel().send(MessageBuilder.withPayload(sampleEvent).build());
} catch(Exception ex) {
log.error("Error while producing SampleEvent: {}", ex.toString());
return false;
}
return true;
}
}
KafkaConfig.java
package com.sample.redisandkafka.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.schema.registry.client.ConfluentSchemaRegistryClient;
import org.springframework.cloud.schema.registry.client.SchemaRegistryClient;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.KafkaHeaderMapper;
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
RedisCacheConfig.java
package com.sample.redisandkafka.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@EnableCaching
@Configuration
@ConditionalOnProperty(name = "spring.cache.enabled", havingValue = "true")
public class RedisCacheConfig extends CachingConfigurerSupport {
@Autowired
private CacheConfigurationProperties cacheConfigurationProperties = null;
private RedisCacheConfiguration createCacheConfiguration(long timeoutInHours) {
return RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(timeoutInHours))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
}
@Bean
public CacheManager cacheManager(LettuceConnectionFactory redisConnectionFactory) {
Map cacheConfigurations = new HashMap();
if (Objects.nonNull(cacheConfigurationProperties.getCachesTTL())) {
for (Entry cacheNameAndTimeout : cacheConfigurationProperties.getCachesTTL()
.entrySet()) {
cacheConfigurations.put(cacheNameAndTimeout.getKey(),
createCacheConfiguration(cacheNameAndTimeout.getValue()));
}
}
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(createCacheConfiguration(cacheConfigurationProperties.getDefaultTTL()))
.withInitialCacheConfigurations(cacheConfigurations).build();
}
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(
cacheConfigurationProperties.getHost(), cacheConfigurationProperties.getPort());
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Configuration
@ConfigurationProperties(prefix = "spring.cache")
@Data
class CacheConfigurationProperties {
private String host;
private int port;
private Long defaultTTL;
private Map cachesTTL;
}
}
SampleInputSink.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface SampleInputSink {
String SAMPLE = "sampleInput";
@Input(SAMPLE)
MessageChannel sampleMessageChannel();
}
SampleOutputSource.java
package com.sample.redisandkafka.config;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SampleOutputSource {
String SAMPLE = "sampleOutput";
@Output(SAMPLE)
MessageChannel sampleMessageChannel();
}
application.yml
spring:
application:
name: redisandkafka
main:
allow-bean-definition-overriding: true
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:9081
kafka:
binder:
brokers: PLAINTEXT_HOST://localhost:30092
min-partition-count: 1
replication-factor: 1
useNativeDecoding: true
bindings:
sampleInput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
configuration:
schema.registry.url: http://localhost:9081
specific.avro.reader: true
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max:
poll:
records: 100
interval.ms: 900000
bindings:
sampleInput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event
group: sample.local.sample_event
sampleOutput:
contentType: application/*+avro
destination: redisandkafka.local.sample_event_output
group: sample.local.sample_event
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.StringSerializer
value:
serde: io.confluent.kafka.serializers.KafkaAvroSerializer
security.basic.enable: false
management.security.enabled: false
security.ignored: /**
Sample.asvc
{
"namespace": "com.sample.redisandkafka",
"type": "record",
"name": "SampleEvent",
"fields": [
{"name": "field", "type": "string", "default": "null"}
]
}
[b]Обновление 1:[/b]
Используя тот же пример приложения, мы удалили зависимости, связанные с Redis [приведены ниже], и удалили класс RedisCacheConfig class и сделал приложение kafka самостоятельно, и оно работает хорошо
pom.xml
org.springframework.boot
spring-boot-starter-data-redis
application.yml
cache:
cache-names: sample-cache
default-ttl: 1 # TTL in hours
enabled: true
type: redis
host: localhost
port: 6379
caches-ttl: # TTL in hours
sample-cache: 1
Подробнее здесь: [url]https://stackoverflow.com/questions/78304087/spring-boot-redis-and-kafka-serialization-error[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия