Код: Выделить всё
public class MedianUDAF2 extends AggregateFunction {
public static class State {
public int scale = 2;
public ListView numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ListView();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers.getList());
}
}
@Override
public Double getValue(State acc) {
try {
List numbers = acc.numbers.getList();
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
Код: Выделить всё
public class MedianUDAF extends AggregateFunction {
public static class State {
public int scale = 2;
@DataTypeHint(value = "ARRAY")
public ArrayList numbers;
public State() {}
}
@Override
public State createAccumulator() {
State state = new State();
state.numbers = new ArrayList();
return state;
}
public void accumulate(State acc, Double val, Integer scale) throws Exception {
acc.numbers.add(val);
if (scale != null && scale > 0) acc.scale = scale;
}
public void merge(State acc, Iterable it) throws Exception {
for (State a : it) {
acc.numbers.addAll(a.numbers);
}
}
@Override
public Double getValue(State acc) {
try {
List numbers = acc.numbers;
numbers.sort(Double::compareTo);
double n = numbers.size() - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
} catch (Exception ignored) {
}
return 0.0;
}
}
Код: Выделить всё
tableEnvironment.createTemporarySystemFunction("median", new MedianUDAF()); // Or new MedianUDAF2()
Table table = tableEnvironment.sqlQuery("select median(l_linenumber, 2) from lineitem");
В комментариях к ListView говорится, что он будет использовать серверную часть состояния при работе с большими объемами данных. До версии Flink-table-planner 1.14 addAccumulatorDataViews в AggregationCodeGenerator мог видеть этот процесс преобразования, но в версии 1.20 он больше не виден.
Я пытался отладить этот процесс преобразования в классе AggsHandlerCodeGenerator, но все равно не удалось. ,
Могу ли я спросить, где произошел этот процесс преобразования и как мне следует наблюдать это явление? Спасибо, спасибо!!!
Подробнее здесь: https://stackoverflow.com/questions/790 ... -arraylist
Мобильная версия