Как синхронизировать потоки данных в BroadcastProcessFunction flink?JAVA

Программисты JAVA общаются здесь
Ответить
Гость
 Как синхронизировать потоки данных в BroadcastProcessFunction flink?

Сообщение Гость »


im using flink 1.81.1 api on java 11 and im trying to use a

Код: Выделить всё

BroadcastProcessFunction
to filter a Products Datastream with a brand autorized Datastream as broadcast.
So my first products Datastream contains different products that has a field brand and my second brands Datastream contains only brands that should be allowed .
The problem is that when my products comes to

Код: Выделить всё

processElement
of the

Код: Выделить всё

BroadcastProcessFunction
, the brandState is not yet full of the brands Datastream records, for example i have 4800 brands in my brands DataStream but when the products goes to

Код: Выделить всё

processElement
, the brandState only contains few of them (like 200 brands) , and this is causing problems because i have products which will not be allowed because their brands are not uploaded yet in the

Код: Выделить всё

brandState
Here is my

Код: Выделить всё

BroadcastProcessFunction

Код: Выделить всё

public class GateCoProcess extends BroadcastProcessFunction {
private final MapStateDescriptor broadcastStateDescriptor;

public GateCoProcess(MapStateDescriptor broadcastStateDescriptor) {
this.broadcastStateDescriptor = broadcastStateDescriptor;

}
@Override
public void processElement(CrawlData value, ReadOnlyContext ctx, Collector out) throws Exception {
ReadOnlyBroadcastState brandState = ctx.getBroadcastState(broadcastStateDescriptor);

if (brandState.contains(value.data.product.brand)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(Brand brand, Context ctx, Collector out) throws Exception {
BroadcastState brandState = ctx.getBroadcastState(broadcastStateDescriptor);
if (brand.active) {
brandState.put(brand.getName(), true);
}
}
}
and here is my Datastreams and call of the function

Код: Выделить всё

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream
brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");

MapStateDescriptor broadcastStateDescriptor = new MapStateDescriptor(
"broadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO);

BroadcastStream
broadcastStream = brands.broadcast(broadcastStateDescriptor);

// integration is the products Datastream
DataStream integration = ExtractData.extractProducts(env);

DataStream filtered = integration.connect(broadcastStream).process(new  GateCoProcess(broadcastStateDescriptor));

env.execute("mon job de products");

What should i do to get around this problem ? thanks
i tried using watermarks but with no result , my classes are not with timestamps


Источник: https://stackoverflow.com/questions/781 ... tion-flink
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»