Мне нужно создать выходные данные для 4 разных файлов в моем конвейере Apache Beam. Я буду использовать очень упрощенный случай, чтобы защитить основную работу, но здесь необходимо сохранить общую структуру. Я хотел бы взять неограниченную коллекцию из темы Kafka и опубликовать ее в 4 разных файлах. Неограниченный сбор будет организован в оконном режиме с минимальным интервалом между пакетами данных в 30 минут. Для каждого оконного набора данных я бы хотел, чтобы один файл содержал все записи со номером больше 100, мы назовем этот файл Greater.dat, один файл будет содержать все записи со номером меньше или равным 100, мы назовем этот файл less.dat, а затем файл сводки, который для каждого значения big.summ и less.sum содержит количество записей в каждом. Именно этого я и ожидал от следующего потока данных
Код: Выделить всё
... greater than 30 minutes since last data
10
20
120
110
30
40
... 30 minutes elapse
greater.dat
120
110
less.dat
10
20
30
40
greater.sum
2
less.sum
4
Это то, что у меня есть на данный момент
Код: Выделить всё
public class TupleTagContainer {
public static final TupleTag greaterThan
= new TupleTag() {};
public static final TupleTag lessThan
= new TupleTag() {};
}
public class LTGTSplitter extends DoFn {
@ProcessElement
public void processElement (
@Element Integer elem,
MultiOutputReceiver out
) {
Integer copy = new Integer(elem);
if (copy
Подробнее здесь: [url]https://stackoverflow.com/questions/79344407/output-pcollections-to-multiple-files-in-apache-beam[/url]