Описание проблемы
Я обнаружил исключение StreamTaskException при попытке выполнить задание Flink с помощью следующей команды:
./flink run flink-stream-app/target/flink-stream-app-1.0-SNAPSHOT.jar
Сводка ошибок
В ошибке указано, что класс RandomBinaryGenerator не удалось загрузить. Ниже приведена соответствующая часть трассировки стека:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: невозможно загрузить пользовательский класс: org.example.RandomBinaryGenerator
Вызвано автор: java.lang.ClassNotFoundException: org.example.RandomBinaryGenerator
Это исключение возникло во время восстановления задания, как описано в ИсправленоDelayRestartBackoffTimeStrategy. Загрузчику классов не удалось разрешить класс RandomBinaryGenerator.
Проверка
Похоже, что JAR-файл содержит требуемый класс:
jar tvf target/flink-stream-app-1.0-SNAPSHOT.jar | grep RandomBinaryGenerator
1855 Thu Jan 09 14:48:48 CET 2025 org/example/RandomBinaryGenerator.class
Запуск потока данных в табличной среде с временным представлением
Цельyour text
Я пытаюсь запустить поток данных в табличной среде с временным представлением. Моя цель — обеспечить непрерывную работу DataStream, чтобы я мог интегрировать его позже с помощью jdbc_fdw. Текущее содержимое DataStream не имеет отношения к этому этапу.
Код
Основной код приложения
package org.example;import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class App {
public static void main(String[] args) throws Exception {
String ipAddress = "127.0.0.1";
int port = 8083;
Configuration configuration = new Configuration();
// Setting up the configuration
configuration.setString("rest.address", ipAddress);
configuration.setInteger("rest.port", port);
configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
configuration.setInteger("taskmanager.memory.process.size", 1024);
configuration.setString("table.exec.resource.default-parallelism", "1");
configuration.setBoolean("table.dynamic-table-options.enabled", true);
configuration.setString("restart-strategy", "fixed-delay");
configuration.setString("restart-strategy.fixed-delay.attempts", "3");
configuration.setString("restart-strategy.fixed-delay.delay", "10s");
// MiniCluster configuration
MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(1)
.build();
MiniCluster miniCluster = new MiniCluster(miniClusterConfig);
miniCluster.start();
try {
// Creating StreamExecutionEnvironment and StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// Adding a source stream
DataStream sourceStream = env.addSource(new RandomBinaryGenerator());
// Registering a catalog and using it
tEnv.executeSql("CREATE CATALOG myCatalog WITH ('type'='generic_in_memory')");
tEnv.useCatalog("myCatalog");
// Printing the stream to console
sourceStream.print().name("Console Sink");
// Converting DataStream to Table and creating a temporary view
Table sourceTable = tEnv.fromDataStream(sourceStream,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.build());
tEnv.createTemporaryView("stream_job", sourceTable);
// Executing the job and starting the Flink cluster
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobID jobId = miniCluster.submitJob(jobGraph).get().getJobID();
System.out.println("Flink Job started with JobID: " + jobId);
System.out.println("Flink Cluster running at: http://" + ipAddress + ":" + port);
System.out.println("Press Enter to stop...");
System.in.read();
} finally {
// Closing the cluster
miniCluster.close();
}
}
}
Исходный код случайного двоичного генератора
package org.example;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.Serializable;
public class RandomBinaryGenerator implements SourceFunction, Serializable {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect((int) (Math.random() * 2));
}
Thread.sleep(1000); // Simulating data generation delay
}
}
@Override
public void cancel() {
isRunning = false;
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... olve-class
Мобильная версия