Чтение данных из Kafka и запись в куст.
При запуске приложения создается соединение с Kafka, создается каталог, устанавливается диалект куста. и выполните создание таблицы в hive.
При выполнении CREATE HIVE TABLE во flink я получаю сообщение об ошибке:
Код: Выделить всё
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_412]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.19.0.jar:1.19.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_412]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_412]
at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka52c5a613-0102-42f6-86ff-a2a04d0f7629.jar:1.19.0]
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) [flink-rpc-akka52c5a613-0102-42f6-86ff-a2a04d0f7629.jar:1.19.0]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_412]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_412]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_412]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_412]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.delegation.ParserFactory' in the classpath.
Available factory identifiers are:
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.19.0.jar:1.19.0]
... 12 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.delegation.ParserFactory' in the classpath.
Код: Выделить всё
package org.my.nrt;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Main {
public static void main(String[] args) {
String createCatalog = "CREATE CATALOG hive WITH (\n" +
" 'type' = 'hive',\n" +
" 'hive-conf-dir' = '/etc/hive/conf'\n" +
");";
String useCatalog = "USE CATALOG hive;";
String ddlTable = "create table if not exists default.tab_1\n" +
"(\n" +
" col_1 string,\n" +
" col_2 string,\n" +
" col_3 string,\n" +
" col_4 string\n" +
")\n" +
"partitioned by (dt STRING, hr STRING)\n" +
"stored as textfile\n" +
"tblproperties (\n" +
" 'sink.partition-commit.trigger' = 'partition-time',\n" +
" 'sink.partition-commit.delay' = '1 min',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
" 'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00'\n" +
");";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(createCatalog);
tEnv.executeSql(useCatalog);
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql(ddlTable);
}
}
Почему я получаю эту ошибку?
Подробнее здесь: https://stackoverflow.com/questions/789 ... implements