Код: Выделить всё
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().enableUnalignedCheckpoints();
Configuration config = new Configuration();
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend().configure(config,null);
env.setStateBackend(rocksDB);
config.setString("state.backend.rocksdb.localdir","/home/flinkJob/Rockdb");
env.getCheckpointConfig().setCheckpointStorage("file:///home/flinkJob/FlinkCheckpoints");
env.setParallelism(1);
env.enableCheckpointing(7500);
KafkaSource source = KafkaSource.builder()
.setBootstrapServers("localhost:9092")
.setTopics("Events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
final OutputTag gclidTag = new OutputTag("gclid-events"){};
SingleOutputStreamOperator kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").slotSharingGroup("Slot1")
.process(new StringToJson()).slotSharingGroup("Slot1");
SingleOutputStreamOperator eventStream = kafkaStream
.process(new SplitEvents());
DataStream gclidStream = eventStream.getSideOutput(gclidTag);
DataStreamSink nonGclidStream = eventStream
.addSink(new fileWrite());
DataStream adwordsStream = env.fromData(
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA"),
createAdwordsEntry("24567", "Campaign B", "Ad 2", "AccountIdB"),
createAdwordsEntry("789789", "Campaign C", "Ad 3", "AccountIdC"),
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA"),
createAdwordsEntry("24567", "Campaign B", "Ad 2", "AccountIdB"),
createAdwordsEntry("789789", "Campaign C", "Ad 3", "AccountIdC"),
createAdwordsEntry("123456", "Campaign A", "Ad 1", "AccountIdA")
).map(new MapFunction(){
@Override
public JSONObject map(JSONObject value) throws Exception {
Thread.sleep(5000);
return value;
}
})
.keyBy(event -> event.getString("gclid"));
DataStreamSink joinedStream = gclidStream
.keyBy(data -> data.getString("gclid"))
.connect(adwordsStream.keyBy(data -> data.getString("gclid")))
.process(new JoinFunction()).slotSharingGroup("Slot6")
.addSink(new fileWrite()).slotSharingGroup("Slot6");
env.execute("Flink 1");
}
< /code>
Это оператор соединения, который обрабатывает соединение и хранение состояний < /p>
public class JoinFunction extends KeyedCoProcessFunction {
private transient MapState gclidState;
@Override
public void open(Configuration parameters) {
MapStateDescriptor descriptor =
new MapStateDescriptor(
"gclidState",
Types.STRING,
Types.GENERIC(JSONObject.class)
);
gclidState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement1(JSONObject event, Context ctx, Collector out) throws Exception {
String gclid = event.getString("gclid");
gclidState.put(gclid, event);
long midnightTimestamp = getMidnightTimestamp();
try (FileWriter writer = new FileWriter("states.txt", true)) {
for (Map.Entry entry : gclidState.entries()) {
writer.write("\n\n" + entry.getKey() + entry.getValue());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void processElement2(JSONObject event, Context ctx, Collector out) throws Exception {
String gclid = event.getString("gclid");
JSONObject gclidEvent = gclidState.get(gclid);
if (gclidEvent != null) {
formatJoinedData(gclidEvent, event, out);
gclidState.remove(gclid);
}
}
Подробнее здесь: https://stackoverflow.com/questions/795 ... -runtime-f