Вот мой код: < /p>
Код: Выделить всё
BatchStage batch1= pipeline.readFrom(companyListBatchSource);
BatchStage batch2= pipeline.readFrom(employeeListBatchSource);
//Getting group by key
BatchStageWithKey jdbcGroupByKey = batch1.groupingKey(a -> a.getSource1().get(col1));
BatchStageWithKey fileGroupByKey = batch2.groupingKey(b -> b.getSource1().get(col2));
BatchStage d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),fileGroupByKey,AggregateOperations.toList());
BatchStage jdbcBatchStageData = d.filter(h -> {
return !h.getValue().f0().isEmpty() && !h.getValue().f1().isEmpty();
}).map(e -> {
try {
List list = new ArrayList();
e.getValue().f0().forEach(z -> {
if (e.getValue().f1().size() > 0) {
e.getValue().f1().forEach(z1 -> {
List a = new ArrayList();
a.addAll((List)z);
a.addAll((List)z1);
list.add(a);
});
}
});
return list;
} catch (Exception e1) {
return null;
}
});
< /code>
Это работает нормально, но если есть большие данные, то он выходит из PF Memory из -за этой строки: < /p>
BatchStage d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),fileGroupByKey,AggregateOperations.toList());
будет здорово, если кто -то сможет помочь.
Подробнее здесь: https://stackoverflow.com/questions/795 ... elcast-jet