Я создаю один глобальный объект топологии в ProcessFunction без ключей с параллелизмом = 1. Я сохраняю его как локальный изменяемый объект и обновляю его для каждого входного события с помощью topology.apply(GnmiEvent). После обновления локальной топологии я вызываю out.collect(topology), чтобы создать текущее представление. Я транслирую этот вывод в нисходящий поток.
Если я вызываю out.collect(topology) и позже изменяю один и тот же экземпляр топологии для последующих событий (с помощью метода apply), могут ли нижестоящие (связанные) операторы или приемники наблюдать эти более поздние мутации для ранее созданного объекта? Я думаю, в этом случае все будет в порядке, потому что я транслирую это напрямую? Но что произойдет, если я добавлю еще одного оператора, который не предполагает перетасовку сети? При каких условиях это происходит (цепочка или границы сети/параллелизм)? Нужно ли мне глубоко копировать состояние перед отправкой?
Создает ли topologyState.add(topology) в snapshotState() безопасную глубокую копию для обычных серверных частей (HashMapStateBackend/Heap vs RocksDB)? Должен ли я защищать копирование при создании снимков или при отправке?
public class TopologyProcessFunction
extends ProcessFunction
implements CheckpointedFunction {
private Topology topology;
private transient ListState topologyState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor descriptor =
new ListStateDescriptor("topology-state", Topology.class);
topologyState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
topology = getTopologyFromState();
}
}
@Override
public void processElement(GnmiEvent gnmiEvent,
Context ctx,
Collector out) {
if (topology == null) {
topology = new Topology();
}
topology.apply(gnmiEvent);
out.collect(topology);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (topology != null) {
topologyState.clear();
topologyState.add(topology);
}
}
private Topology getTopologyFromState() throws Exception {
Iterator it = topologyState.get().iterator();
return it.hasNext() ? it.next() : null;
}
}
Единственное, что я нашел, это: https://stackoverflow.com/a/66597952/11922563
Но, будучи новичком во Flink, я не мог понять, действительно ли это отвечает/применимо к моему вопросу.
Заранее спасибо!
Я создаю один глобальный объект топологии в ProcessFunction без ключей с параллелизмом = 1. Я сохраняю его как локальный изменяемый объект и обновляю его для каждого входного события с помощью topology.apply(GnmiEvent). После обновления локальной топологии я вызываю out.collect(topology), чтобы создать текущее представление. Я транслирую этот вывод в нисходящий поток. [list] [*]Если я вызываю out.collect(topology) и позже изменяю один и тот же экземпляр топологии для последующих событий (с помощью метода apply), могут ли нижестоящие (связанные) операторы или приемники наблюдать эти более поздние мутации для ранее созданного объекта? Я думаю, в этом случае все будет в порядке, потому что я транслирую это напрямую? Но что произойдет, если я добавлю еще одного оператора, который не предполагает перетасовку сети? При каких условиях это происходит (цепочка или границы сети/параллелизм)? Нужно ли мне глубоко копировать состояние перед отправкой? [*]Создает ли topologyState.add(topology) в snapshotState() безопасную глубокую копию для обычных серверных частей (HashMapStateBackend/Heap vs RocksDB)? Должен ли я защищать копирование при создании снимков или при отправке? [/list] [code]public class TopologyProcessFunction extends ProcessFunction implements CheckpointedFunction {
@Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor descriptor = new ListStateDescriptor("topology-state", Topology.class); topologyState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { topology = getTopologyFromState(); } }
@Override public void processElement(GnmiEvent gnmiEvent, Context ctx, Collector out) { if (topology == null) { topology = new Topology(); } topology.apply(gnmiEvent); out.collect(topology); }
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (topology != null) { topologyState.clear(); topologyState.add(topology); } }
private Topology getTopologyFromState() throws Exception { Iterator it = topologyState.get().iterator(); return it.hasNext() ? it.next() : null; } } [/code] Единственное, что я нашел, это: https://stackoverflow.com/a/66597952/11922563 Но, будучи новичком во Flink, я не мог понять, действительно ли это отвечает/применимо к моему вопросу. Заранее спасибо!