Ниже приведен код, который я написал функцию UDF для Upserts массива Mongo
Код: Выделить всё
UserDefinedFunction mergeArrays = functions.udf((WrappedArray array1, WrappedArray array2) -> {
Map arrayKeyMap = new HashMap();
List array1List = JavaConverters.seqAsJavaList(array1);
List array2List = JavaConverters.seqAsJavaList(array2);
String key1 = arrayFieldUniqueKeys.get(0);
String key2 = arrayFieldUniqueKeys.size() > 1 ? arrayFieldUniqueKeys.get(1) : "-1";
for (Row row : array1List) {
arrayKeyMap.put(new Tuple2(row.getAs(key1), key2.equals("-1") ? key2 : row.getAs(key2)), row);
}
List mergedList = new ArrayList();
mergedList.addAll(array2List);
for (Row row : array2List) {
Tuple2 key = new Tuple2(row.getAs(key1), key2.equals("-1") ? key2 : row.getAs(key2));
if (!arrayKeyMap.containsKey(key)) {
mergedList.add(row);
} else {
int index = 0;
for (Row mrow : mergedList) {
Tuple2 mRowKey = new Tuple2(mrow.getAs(key1), key2.equals("-1") ? key2 : mrow.getAs(key2));
if (mRowKey.equals(key)) {
break;
}
index++;
}
mergedList.set(index, arrayKeyMap.get(key));
}
}
return mergedList;
}, dataType);
Код: Выделить всё
"arrayFields": "LegalAddress",
"arrayFieldsUniqueKey": "LegalAddress\~LocationGroupID,LegalAddress\~AddressType",
Подробнее здесь: https://stackoverflow.com/questions/784 ... java-spark