Почему Flink States в RockSDB не удерживается со контрольных точек после сбоя времени выполнения?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Почему Flink States в RockSDB не удерживается со контрольных точек после сбоя времени выполнения?

Сообщение Anonymous »

Я запускаю работу для использования для использования информации AdWords Info и пользовательских событий. Мне нужно сохранить эти конкретные рекламные события в состоянии, чтобы присоединиться к данным AdWords, которые приходят поздно. Штаты свежеприемники и хранят события, которые поступают только после контрольно -пропускного пункта. Почему это происходит, я реализую что -то не так. Дайте мне знать? < /p>

Код: Выделить всё

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().enableUnalignedCheckpoints();
Configuration config = new Configuration();
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend().configure(config,null);

env.setStateBackend(rocksDB);
config.setString("state.backend.rocksdb.localdir","/home/flinkJob/Rockdb");

env.getCheckpointConfig().setCheckpointStorage("file:///home/flinkJob/FlinkCheckpoints");
env.setParallelism(1);
env.enableCheckpointing(7500);

KafkaSource source = KafkaSource.builder()
.setBootstrapServers("localhost:9092")
.setTopics("Events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();

final OutputTag gclidTag = new OutputTag("gclid-events"){};

SingleOutputStreamOperator kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").slotSharingGroup("Slot1")
.process(new StringToJson()).slotSharingGroup("Slot1");

SingleOutputStreamOperator eventStream = kafkaStream
.process(new SplitEvents());

DataStream gclidStream = eventStream.getSideOutput(gclidTag);

DataStreamSink nonGclidStream = eventStream
.addSink(new fileWrite());

DataStream adwordsStream = env.fromData(
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA"),
createAdwordsEntry("24567", "Campaign B", "Ad 2", "AccountIdB"),
createAdwordsEntry("789789", "Campaign C", "Ad 3", "AccountIdC"),
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA"),
createAdwordsEntry("24567", "Campaign B", "Ad 2", "AccountIdB"),
createAdwordsEntry("789789", "Campaign C", "Ad 3", "AccountIdC"),
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA")
).map(new MapFunction(){
@Override
public JSONObject map(JSONObject value) throws Exception {
Thread.sleep(5000);
return value;
}
})
.keyBy(event -> event.getString("gclid"));

DataStreamSink joinedStream = gclidStream
.keyBy(data -> data.getString("gclid"))
.connect(adwordsStream.keyBy(data -> data.getString("gclid")))
.process(new JoinFunction()).slotSharingGroup("Slot6")
.addSink(new fileWrite()).slotSharingGroup("Slot6");

env.execute("Flink 1");
}
< /code>
Это оператор соединения, который обрабатывает соединение и хранение состояний < /p>
public class JoinFunction extends KeyedCoProcessFunction {
private transient MapState gclidState;

@Override
public void open(Configuration parameters) {
MapStateDescriptor  descriptor =
new MapStateDescriptor(
"gclidState",
Types.STRING,
Types.GENERIC(JSONObject.class)
);
gclidState = getRuntimeContext().getMapState(descriptor);
}

@Override
public void processElement1(JSONObject event, Context ctx, Collector out) throws Exception {

String gclid = event.getString("gclid");
gclidState.put(gclid, event);
long midnightTimestamp = getMidnightTimestamp();

try (FileWriter writer = new FileWriter("states.txt", true)) {
for (Map.Entry entry : gclidState.entries()) {
writer.write("\n\n" + entry.getKey() + entry.getValue());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void processElement2(JSONObject event, Context ctx, Collector out) throws Exception {

String gclid = event.getString("gclid");
JSONObject gclidEvent = gclidState.get(gclid);

if (gclidEvent != null) {
formatJoinedData(gclidEvent, event, out);
gclidState.remove(gclid);
}
}
Мне нужно то, что мне нужно изменить в реализации, чтобы задача сохранить прошлые состояния, которые хранятся до неудачи

Подробнее здесь: https://stackoverflow.com/questions/795 ... -runtime-f
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»