У меня есть проект, который предоставляет клиенту графический интерфейс, где клиент может использовать функцию перетаскивания и настраивать источник и приемник Flink. Компонент графического пользовательского интерфейса поддерживает несколько типов источника и приемника, таких как Kafka, File, DB и т. д., а также несколько типов для сериализатора/десериализатора. У нас есть несколько предопределенных функций карты в графическом интерфейсе, где пользователь может предоставить схему объекта JSON. На серверной стороне мы составляем Java-код на основе конфигурации графического интерфейса, конвертируем его в JAR и отображаем во Flink.
Это отлично работает на машине с Unix. Но при написании интеграционных тестов с использованием MiniClusterWithClientResource я столкнулся с проблемой, связанной с Java-кодом, создаваемым динамически на основе конфигурации графического интерфейса.
Я пробовал зарегистрировать свои классы, а также jar с помощью URLClassLoader
URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File(jarName).toURI().toURL() });
Thread.currentThread().setContextClassLoader(classLoader);
Я также зарегистрировал путь к динамически генерируемым jar-файлам в pom в конфигурации maven-surefire-plugin
Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
[mini-cluster-io-thread-2] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 155cebff305d242ab4b9750b0730f0a4 has been registered for cleanup in the JobResultStore after reaching a terminal state.
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:134)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassNotFoundException: com.Employee.AverageAge
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2025)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1892)
at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1855)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:500)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:458)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
Подробнее здесь: https://stackoverflow.com/questions/792 ... rated-code
MiniClusterWithClientResource не удается найти динамически сгенерированный код ⇐ JAVA
Программисты JAVA общаются здесь
1732980342
Anonymous
У меня есть проект, который предоставляет клиенту графический интерфейс, где клиент может использовать функцию перетаскивания и настраивать источник и приемник Flink. Компонент графического пользовательского интерфейса поддерживает несколько типов источника и приемника, таких как Kafka, File, DB и т. д., а также несколько типов для сериализатора/десериализатора. У нас есть несколько предопределенных функций карты в графическом интерфейсе, где пользователь может предоставить схему объекта JSON. На серверной стороне мы составляем Java-код на основе конфигурации графического интерфейса, конвертируем его в JAR и отображаем во Flink.
Это отлично работает на машине с Unix. Но при написании интеграционных тестов с использованием [b]MiniClusterWithClientResource[/b] я столкнулся с проблемой, связанной с Java-кодом, создаваемым динамически на основе конфигурации графического интерфейса.
Я пробовал зарегистрировать свои классы, а также jar с помощью [b]URLClassLoader[/b]
URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File(jarName).toURI().toURL() });
Thread.currentThread().setContextClassLoader(classLoader);
Я также зарегистрировал путь к динамически генерируемым jar-файлам в pom в конфигурации [b]maven-surefire-plugin[/b]
Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
[mini-cluster-io-thread-2] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 155cebff305d242ab4b9750b0730f0a4 has been registered for cleanup in the JobResultStore after reaching a terminal state.
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:134)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassNotFoundException: com.Employee.AverageAge
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2025)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1892)
at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1855)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:500)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:458)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
Подробнее здесь: [url]https://stackoverflow.com/questions/79239858/miniclusterwithclientresource-unable-to-locate-dynamically-generated-code[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия