Безопасно ли изменять, испускать и делать снимки одного и того же экземпляра состояния оператора в Apache Flink?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Безопасно ли изменять, испускать и делать снимки одного и того же экземпляра состояния оператора в Apache Flink?

Сообщение Anonymous »

Я создаю один глобальный объект топологии в ProcessFunction без ключей с параллелизмом = 1. Я сохраняю его как локальный изменяемый объект и обновляю его для каждого входного события с помощью topology.apply(GnmiEvent). После обновления локальной топологии я вызываю out.collect(topology), чтобы создать текущее представление. Я транслирую этот вывод в нисходящий поток.
  • Если я вызываю out.collect(topology) и позже изменяю один и тот же экземпляр топологии для последующих событий (с помощью метода apply), могут ли нижестоящие (связанные) операторы или приемники наблюдать эти более поздние мутации для ранее созданного объекта? Я думаю, в этом случае все будет в порядке, потому что я транслирую это напрямую? Но что произойдет, если я добавлю еще одного оператора, который не предполагает перетасовку сети? При каких условиях это происходит (цепочка или границы сети/параллелизм)? Нужно ли мне глубоко копировать состояние перед отправкой?
  • Создает ли topologyState.add(topology) в snapshotState() безопасную глубокую копию для обычных серверных частей (HashMapStateBackend/Heap vs RocksDB)? Должен ли я защищать копирование при создании снимков или при отправке?

Код: Выделить всё

public class TopologyProcessFunction
extends ProcessFunction
implements CheckpointedFunction {

private Topology topology;
private transient ListState topologyState;

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor descriptor =
new ListStateDescriptor("topology-state", Topology.class);
topologyState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
topology = getTopologyFromState();
}
}

@Override
public void processElement(GnmiEvent gnmiEvent,
Context ctx,
Collector out) {
if (topology == null) {
topology = new Topology();
}
topology.apply(gnmiEvent);
out.collect(topology);
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (topology != null) {
topologyState.clear();
topologyState.add(topology);
}
}

private Topology getTopologyFromState() throws Exception {
Iterator it = topologyState.get().iterator();
return it.hasNext() ? it.next() : null;
}
}
Единственное, что я нашел, это: https://stackoverflow.com/a/66597952/11922563
Но, будучи новичком во Flink, я не мог понять, действительно ли это отвечает/применимо к моему вопросу.
Заранее спасибо!

Подробнее здесь: https://stackoverflow.com/questions/798 ... nce-in-apa
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»