Я получаю эту ошибку без файловой системы для схемы "S3" Когда я запускаю свой тест.
Container.ExecResult(exitCode=1, stdout=, stderr=WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator (file:/opt/flink/lib/flink-dist-1.20.1.jar) to field java.util.Collections$UnmodifiableMap.m
WARNING: Please consider reporting this to the maintainers of org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No FileSystem for scheme "s3"
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:391)
at com.xyz.odl.util.parquet.JsonParquetReader.readParquetFileAsCollection(JsonParquetReader.java:53)
at com.xyz.odl.BackfillDynamoJob.main(BackfillDynamoJob.java:47)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
... 9 more
)
< /code>
Вот моя настройка теста: < /p>
private GenericContainer constructAndStartFlinkJobManager() {
String flinkConfYamlPath =
Paths.get("src/test/resources/flink-conf.yaml").toAbsolutePath().toString();
String flinkS3FsPrestoJar =
Paths.get("src/test/resources/plugins/flink-s3-fs-presto-1.20.1.jar")
.toAbsolutePath()
.toString();
try {
GenericContainer flinkJobManagerContainer =
new GenericContainer(DockerImageName.parse("arm64v8/flink:1.20.1-java11"))
.withCopyFileToContainer(
MountableFile.forHostPath(propertiesJsonFile.getAbsolutePath()),
"/opt/flink/conf/application-properties.json")
.withCopyFileToContainer(
MountableFile.forHostPath(flinkConfYamlPath), "/opt/flink/conf/flink-conf.yaml")
.withNetwork(network)
.withNetworkAliases("jobmanager")
.withCommand("jobmanager")
.withExposedPorts(8081)
.withEnv("JOB_MANAGER_RPC_ADDRESS", "jobmanager")
.withEnv(
Map.of(
"AWS_ACCESS_KEY_ID", "test-access-key",
"AWS_SECRET_ACCESS_KEY", "test-secret-key",
"AWS_REGION", "us-east-1",
"AWS_ENDPOINT_URL_S3", "http://localstack:4566"))
.waitingFor(
Wait.forHttp("/").forPort(8081).withStartupTimeout(Duration.ofSeconds(120)));
flinkJobManagerContainer.start();
ExecResult mkdirResult = flinkJobManagerContainer.execInContainer(
"mkdir", "-p", "/opt/flink/plugins/s3-fs-presto");
if (mkdirResult.getExitCode() != 0) {
throw new RuntimeException("Failed to create directory: " + mkdirResult.getStderr());
}
ExecResult execResult = flinkJobManagerContainer.execInContainer(
"cp",
"/opt/flink/opt/flink-s3-fs-presto-1.20.1.jar",
"/opt/flink/plugins/s3-fs-presto/");
if (execResult.getExitCode() != 0) {
throw new RuntimeException("Failed to copy plugin JAR: " + execResult.getStderr());
}
return flinkJobManagerContainer;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void constructAndStartTaskJobManager(GenericContainer flinkJobManagerContainer) {
String flinkConfYamlPath =
Paths.get("src/test/resources/flink-conf.yaml").toAbsolutePath().toString();
String flinkS3FsPrestoJar =
Paths.get("src/test/resources/plugins/flink-s3-fs-presto-1.20.1.jar")
.toAbsolutePath()
.toString();
try {
new GenericContainer(DockerImageName.parse("arm64v8/flink:1.20.1-java11"))
.withCopyFileToContainer(
MountableFile.forHostPath(propertiesJsonFile.getAbsolutePath()),
"/opt/flink/conf/application-properties.json")
.withCopyFileToContainer(
MountableFile.forHostPath(flinkConfYamlPath), "/opt/flink/conf/flink-conf.yaml")
.withCopyFileToContainer(
MountableFile.forHostPath(flinkS3FsPrestoJar),
"/opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.20.1.jar")
.withNetwork(network)
.dependsOn(flinkJobManagerContainer)
.withNetworkAliases("taskmanager")
.withCommand("taskmanager")
.withEnv("JOB_MANAGER_RPC_ADDRESS", "jobmanager")
.withEnv(
Map.of(
"AWS_ACCESS_KEY_ID", "test-access-key",
"AWS_SECRET_ACCESS_KEY", "test-secret-key",
"AWS_REGION", "us-east-1",
"AWS_ENDPOINT_URL_S3", "http://localstack:4566"))
.waitingFor(
Wait.forLogMessage(".*Successful registration at resource manager.*", 1)
.withStartupTimeout(Duration.ofSeconds(120)))
.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
jar flink-presto-s3 находится в плагинах, как упомянуто здесь
Все, что я пытаюсь сделать в своем проекте,-это ковшом S3, которое содержит файл паркета, читайте его и преобразую каждую запись в строку json в моем проекте, погрузите его. /> мой флик-conf yaml-это < /p>
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.flink.size: 1024mb
jobmanager.memory.flink.size: 1024mb
jobmanager.rpc.address: jobmanager
s3.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
s3.fs.s3a.access.key: test
s3.fs.s3a.secret.key: test
s3.fs.s3a.endpoint: http://localstack:4566
Подробнее здесь: https://stackoverflow.com/questions/796 ... -scheme-s3
Ошибка в задании Flink `Нет файловой системы для схемы" S3 "` `` ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение