- Создает каталог JDBC Iceberg на базе PostgreSQL
- Задает для хранилища Iceberg файловую систему Hadoop
Мои текущие файлы
pom.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.example
iceberg-s3-setup
1.0.0
jar
Iceberg S3 Setup Job
11
11
1.20.2
1.10.0
3.3.6
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java
${flink.version}
org.apache.flink
flink-table-api-java-bridge
${flink.version}
org.apache.iceberg
iceberg-flink-runtime-1.20
${iceberg.version}
org.apache.iceberg
iceberg-aws
${iceberg.version}
org.apache.hadoop
hadoop-aws
${hadoop.version}
org.apache.maven.plugins
maven-shade-plugin
3.5.0
package
shade
false
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
com.example.IcebergS3SetupJava
Я создаю mainClass = com.example.IcebergS3SetupJava (src/main/java/com/example/IcebergS3SetupJava.java)
package com.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class IcebergS3SetupJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000L); // 10 seconds
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// Create Iceberg JDBC catalog pointing to MinIO/S3
tEnv.executeSql(
"CREATE CATALOG iceberg_catalog WITH (\n" +
" 'type' = 'iceberg',\n" +
" 'catalog-type' = 'jdbc',\n" +
" 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog', \n" +
" 'jdbc.uri' = 'jdbc:postgresql://
:/iceberg_catalog',\n" +
" 'jdbc.user' = 'user',\n" +
" 'jdbc.password' = 'pass',\n" +
" 'warehouse' = 'hdfs://:8020/user/warehouse',\n" +
")"
);
// Create sample table
tEnv.executeSql(
"CREATE TABLE iceberg_catalog.default.sample (\n" +
" id BIGINT COMMENT 'unique id',\n" +
" data STRING\n" +
") WITH (\n" +
" 'format-version' = '2'\n" +
")"
);
System.out.println("Iceberg catalog and sample table created successfully on S3!");
// This job only does DDL → execute to make it run
env.execute("Iceberg S3 Catalog & Table Creation");
}
}
Среда:
Flink 1.20.2
Iceberg 1.10.0
PostgreSQL 16
Java 11
Отправить задание
flink run -c com.example.IcebergS3SetupJava target/iceberg-s3-setup-1.0.0.jar
Я отправил заявку и получил ошибку
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils
Подробнее здесь: https://stackoverflow.com/questions/798 ... putils-err
Мобильная версия