im using flink 1.81.1 api on java 11 and im trying to use a
Код: Выделить всё
BroadcastProcessFunctionSo my first products Datastream contains different products that has a field brand and my second brands Datastream contains only brands that should be allowed .
The problem is that when my products comes to
Код: Выделить всё
processElementКод: Выделить всё
BroadcastProcessFunctionКод: Выделить всё
processElementКод: Выделить всё
brandStateКод: Выделить всё
BroadcastProcessFunctionКод: Выделить всё
public class GateCoProcess extends BroadcastProcessFunction {
private final MapStateDescriptor broadcastStateDescriptor;
public GateCoProcess(MapStateDescriptor broadcastStateDescriptor) {
this.broadcastStateDescriptor = broadcastStateDescriptor;
}
@Override
public void processElement(CrawlData value, ReadOnlyContext ctx, Collector out) throws Exception {
ReadOnlyBroadcastState brandState = ctx.getBroadcastState(broadcastStateDescriptor);
if (brandState.contains(value.data.product.brand)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(Brand brand, Context ctx, Collector out) throws Exception {
BroadcastState brandState = ctx.getBroadcastState(broadcastStateDescriptor);
if (brand.active) {
brandState.put(brand.getName(), true);
}
}
}
Код: Выделить всё
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream
brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");
MapStateDescriptor broadcastStateDescriptor = new MapStateDescriptor(
"broadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream
broadcastStream = brands.broadcast(broadcastStateDescriptor);
// integration is the products Datastream
DataStream integration = ExtractData.extractProducts(env);
DataStream filtered = integration.connect(broadcastStream).process(new GateCoProcess(broadcastStateDescriptor));
env.execute("mon job de products");
i tried using watermarks but with no result , my classes are not with timestamps
Источник: https://stackoverflow.com/questions/781 ... tion-flink
Мобильная версия