Соединение Kafka Stream-GlobalKTable по значениямJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Соединение Kafka Stream-GlobalKTable по значениям

Сообщение Anonymous »

`Итак, у меня есть KStream, который десериализуется в POJO вот так -
public class Payment {
public String user_id;
public String state_code;
public String exchange_id;

}
Ниже показана запись Global Ktable, в которой есть переводы для state_code
public class Государство{
public String field_name
public String source_value;

}
Я хочу объединить kstream с этой глобальной таблицей Ktable на основе значения из Payment.State_code = State.field_name, и общих ключей нет. в обоих случаях.
Я могу присоединиться к ним, но не с помощью значений.
Класс-оболочка -
общедоступный класс PaymentStateWrapper{
public String Payment
public String State;

}
KStream PaymentStream =
builder.stream(
INCOMING_TOPIC,
Consumed .with(Serdes.String(), PaymentStreamSerde)
);
GlobalKTable stateStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), stateSerde)
);

KStream stream = paymentStream.leftJoin(
stateStore,
(Key, Value) -> Value.state_code,
(paymentValue, StateValue) -> new PaymentStateJoiner()
);`


Подробнее здесь: https://stackoverflow.com/questions/786 ... -on-values
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»