Flink сталкивается с StreamTaskExeption, поскольку он не может разрешить класс.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Flink сталкивается с StreamTaskExeption, поскольку он не может разрешить класс.

Сообщение Anonymous »

StreamTaskException: ClassNotFoundException для RandomBinaryGenerator
Описание проблемы
Я обнаружил исключение StreamTaskException при попытке выполнить задание Flink с помощью следующей команды:

./flink run flink-stream-app/target/flink-stream-app-1.0-SNAPSHOT.jar

Сводка ошибок
В ошибке указано, что класс RandomBinaryGenerator не удалось загрузить. Ниже приведена соответствующая часть трассировки стека:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: невозможно загрузить пользовательский класс: org.example.RandomBinaryGenerator
Вызвано автор: java.lang.ClassNotFoundException: org.example.RandomBinaryGenerator
Это исключение возникло во время восстановления задания, как описано в ИсправленоDelayRestartBackoffTimeStrategy. Загрузчику классов не удалось разрешить класс RandomBinaryGenerator.
Проверка
Похоже, что JAR-файл содержит требуемый класс:
jar tvf target/flink-stream-app-1.0-SNAPSHOT.jar | grep RandomBinaryGenerator
1855 Thu Jan 09 14:48:48 CET 2025 org/example/RandomBinaryGenerator.class

Запуск потока данных в табличной среде с временным представлением
Цельyour text
Я пытаюсь запустить поток данных в табличной среде с временным представлением. Моя цель — обеспечить непрерывную работу DataStream, чтобы я мог интегрировать его позже с помощью jdbc_fdw. Текущее содержимое DataStream не имеет отношения к этому этапу.
Код

Основной код приложения

package org.example;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class App {
public static void main(String[] args) throws Exception {
String ipAddress = "127.0.0.1";
int port = 8083;
Configuration configuration = new Configuration();

// Setting up the configuration
configuration.setString("rest.address", ipAddress);
configuration.setInteger("rest.port", port);
configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
configuration.setInteger("taskmanager.memory.process.size", 1024);
configuration.setString("table.exec.resource.default-parallelism", "1");
configuration.setBoolean("table.dynamic-table-options.enabled", true);
configuration.setString("restart-strategy", "fixed-delay");
configuration.setString("restart-strategy.fixed-delay.attempts", "3");
configuration.setString("restart-strategy.fixed-delay.delay", "10s");

// MiniCluster configuration
MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(1)
.build();

MiniCluster miniCluster = new MiniCluster(miniClusterConfig);
miniCluster.start();

try {
// Creating StreamExecutionEnvironment and StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);

// Adding a source stream
DataStream sourceStream = env.addSource(new RandomBinaryGenerator());

// Registering a catalog and using it
tEnv.executeSql("CREATE CATALOG myCatalog WITH ('type'='generic_in_memory')");
tEnv.useCatalog("myCatalog");

// Printing the stream to console
sourceStream.print().name("Console Sink");

// Converting DataStream to Table and creating a temporary view
Table sourceTable = tEnv.fromDataStream(sourceStream,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.build());
tEnv.createTemporaryView("stream_job", sourceTable);

// Executing the job and starting the Flink cluster
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobId = miniCluster.submitJob(jobGraph).get().getJobID();

System.out.println("Flink Job started with JobID: " + jobId);
System.out.println("Flink Cluster running at: http://" + ipAddress + ":" + port);
System.out.println("Press Enter to stop...");
System.in.read();

} finally {
// Closing the cluster
miniCluster.close();
}
}
}

Исходный код случайного двоичного генератора

package org.example;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.Serializable;

public class RandomBinaryGenerator implements SourceFunction, Serializable {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;

@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect((int) (Math.random() * 2));
}
Thread.sleep(1000); // Simulating data generation delay
}
}

@Override
public void cancel() {
isRunning = false;
}
}


Подробнее здесь: https://stackoverflow.com/questions/793 ... olve-class
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»