У меня есть интерфейс TransformationService, который имеет несколько реализаций, только одна из которых используется одним заданием Flink. Это определяется во время выполнения, как показано в коде ниже.
Ниже приведена подпись моей службы преобразования —
Код: Выделить всё
public interface TransformationService {
KEY keyBy(EVENT ev);
}
Код: Выделить всё
TransformationService service = getImplementation(pipelineType);
DataStreamSource source = ...;
source
.uid("source")
.rebalance()
...
...
.keyBy(service::keyBy)
...
...
env.execute(job);
Пример реализации TransformationService —
Код: Выделить всё
public class LaunchEventService implements TransformationService {
LaunchEventKey keyBy(LaunchEvent event) {
...
}
}
Код: Выделить всё
org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'KEY' in 'interface TransformationService' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information.
Но при использовании следующей лямбда-функции вместо ссылки на метод в качестве параметров функции keyBy код компилируется и работает нормально -
Код: Выделить всё
TransformationService service = getImplementation(pipelineType);
DataStreamSource source = ...;
source
.uid("source")
.rebalance()
...
...
.keyBy(ev -> service.keyBy(val))
...
...
env.execute(job);
Подробнее здесь: https://stackoverflow.com/questions/798 ... references
Мобильная версия