Java-клиент InfluxDB v3 не работает внутри Flink RichSinkFunction с сообщением «Типы адресов NameResolver 'unix' не поддJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Java-клиент InfluxDB v3 не работает внутри Flink RichSinkFunction с сообщением «Типы адресов NameResolver 'unix' не подд

Сообщение Anonymous »

Я использую Java-клиент InfluxDB v3 в задании Apache Flink. Клиент работает в автономном приложении Java, но происходит сбой во время инициализации при использовании внутри Flink RichSinkFunction.
Среда
  • Apache Flink 1.18.1
  • Java 17
  • Java-клиент InfluxDB v3 (внутренне использует gRPC/Arrow Flight)
  • VM RedHat
Минимальный пример
import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.config.ClientConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class InfluxSink extends RichSinkFunction {

private transient InfluxDBClient client;

@Override
public void open(Configuration parameters) {
ClientConfig config = new ClientConfig.Builder()
.host("https://host:8181")
.token("TOKEN".toCharArray())
.database("DB")
.build();

this.client = InfluxDBClient.getInstance(config);
}

@Override
public void invoke(String value, Context context) {
// no-op
}
}

Используется следующим образом:
stream.addSink(new InfluxSink());

Проблема
Инициализация клиента завершается неудачно в open() с этим исключением:
java.lang.IllegalArgumentException: Address types of NameResolver 'unix' not supported by transport

Это происходит до выполнения фактической записи.
Полная трассировка стека
java.lang.IllegalArgumentException: Address types of NameResolver 'unix' for 'myhost.com:8181' not supported by transport
at io.grpc.internal.ManagedChannelImplBuilder.getNameResolverProvider(ManagedChannelImplBuilder.java:871) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:721) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:278) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.internal.FlightSqlClient.createFlightClient(FlightSqlClient.java:179) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.internal.FlightSqlClient.(FlightSqlClient.java:102) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.internal.FlightSqlClient.(FlightSqlClient.java:82) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.internal.InfluxDBClientImpl.(InfluxDBClientImpl.java:116) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.internal.InfluxDBClientImpl.(InfluxDBClientImpl.java:97) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at com.influxdb.v3.client.InfluxDBClient.getInstance(InfluxDBClient.java:519) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at sink.InfluxSink.open(InfluxSink.java:46) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) ~[ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) [ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) [ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [ApacheFlink-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [ApacheFlink-1.0-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:840) [?:?]

То, что я уже проверил
  • Та же конфигурация клиента Influx DB работает в автономном приложении main() на том же компьютере
  • Ошибка возникает только тогда, когда клиент Influx DB создается внутри Flink
  • Использование хоста: 8181 без схемы вызывает другую ошибку:
java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 9: grpc+tcp:
  • Использование https://localhost:8181 устраняет проблему синтаксического анализа URI, но затем вызывает ошибку NameResolver 'unix', указанную выше.
  • Параллелизм равен 1
Вопрос
Что заставляет gRPC / Arrow Flight разрешать транспорт unix внутри Флинк, а как это можно исправить?
Дополнительная информация
io.grpc:grpc зависимости для Java-клиента InfluxDB
\- com.influxdb:influxdb3-java:jar:1.8.0:compile
\- org.apache.arrow:flight-core:jar:18.3.0:compile
+- io.grpc:grpc-netty:jar:1.71.0:compile
| \- io.grpc:grpc-util:jar:1.71.0:runtime
+- io.grpc:grpc-core:jar:1.71.0:compile
| \- io.grpc:grpc-context:jar:1.71.0:runtime
+- io.grpc:grpc-protobuf:jar:1.71.0:compile
| \- io.grpc:grpc-protobuf-lite:jar:1.71.0:runtime
+- io.grpc:grpc-stub:jar:1.71.0:compile
\- io.grpc:grpc-api:jar:1.71.0:compile
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»