Код: Выделить всё
**Config TaskExecutor:**
@Bean("threadImport")
public TaskExecutor threadImport() {
log.info("*** init threadImport with corePoolSize [{}] maxPoolSize [{}]",
threadImportCorePoolSize, threadImportMaxPoolSize);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); /* 20 poolSize */
executor.setMaxPoolSize(100); /* 100 max poolSize */
executor.setQueueCapacity(1024000); //1024000 default queue capacity
executor.setThreadNamePrefix("threadImport-");
return executor;
}
**Detail consumeT24CustCorpVip: Parallel processing of messages received when consuming kafka**
public void consumeT24CustCorpVip(List messages) throws ExecutionException, InterruptedException {
List lstCompletableFutures = new ArrayList();
if (!CollectionUtils.isEmpty(messages)) {
Gson gson = new GsonBuilder().registerTypeAdapter(LocalDate.class, new LocalDateAdapter()).create(); //create bean Gson custom
messages.stream().parallel().forEach(message -> {
lstCompletableFutures.add(executedThreadT24CorpVip.executeT24CustCorpVip(message, gson));
}); //Parallel processing of messages received when consuming kafka, use
}
}
**Detail executeT24CustCorpVip: convert message to entity and save to database:**
@Async("threadImport") //Use config bean threadImport
public CompletableFuture executeT24CustCorpVip(String message, Gson gson) {
BaseResponse baseResponse = new BaseResponse();
try {
T24VpbCustCorpVip t24VpbCustCorpVip = gson.fromJson(message, T24VpbCustCorpVip.class); //convert message to entity
t24CustCorpVipRepository.save(t24VpbCustCorpVip); //save database
baseResponse.setStatus(Status.SUCCESS);
} catch (Exception e) {
baseResponse.setStatus(Status.ERROR);
}
return CompletableFuture.completedFuture(baseResponse); //response CompletableFuture
}
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void vn.com.vpbank.corp.api.AirportLoungeController.consumeT24CustCorpVip(java.util.List) throws java.util.concurrent.ExecutionException,java.lang.InterruptedException' threw exception; nested exception is java.lang.NullPointerException; nested exception is java.lang.NullPointerException
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1641)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:1387)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1274)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1179)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1162)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException: null
at vn.com.vpbank.corp.service.airportLounge.impl.AirportLoungeServiceImpl.consumeT24CustCorpVip(AirportLoungeServiceImpl.java:352)
at vn.com.vpbank.corp.service.airportLounge.impl.AirportLoungeServiceImpl$$FastClassBySpringCGLIB$$b3d2718d.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:685)
at vn.com.vpbank.corp.service.airportLounge.impl.AirportLoungeServiceImpl$$EnhancerBySpringCGLIB$$7bfb2704.consumeT24CustCorpVip()
at vn.com.vpbank.corp.api.AirportLoungeController.consumeT24CustCorpVip(AirportLoungeController.java:104)
at sun.reflect.GeneratedMethodAccessor428.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:301)
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:141)
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:133)
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:58)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1322)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1262)
... 7 common frames omitted
< /code>
Я не понимаю, почему здесь появляется ошибка Nullpointer? Так что этот метод executet24custcorpvip всегда возвращает объект без нуля.
может кто -нибудь объяснить мне это?
Подробнее здесь: https://stackoverflow.com/questions/796 ... rs-in-java