Как итерация через набор данных Spark и обновить значение столбца на Java?JAVA

Программисты JAVA общаются здесь
Anonymous
Как итерация через набор данных Spark и обновить значение столбца на Java?

Сообщение Anonymous »

Я работаю над POC, где мне нужно обновить номер учетной записи в БД с токенами. Я прочитал данные в DSRecords набора данных (около 2 млн записей). У меня есть еще одна рутина, которая запечатлела различные номера счетов и получил токены, отображение хранится в Hashmap. < /p>

Dataset applySwappedTokens(Dataset dsRecords, Map mappedTokens){
}
< /code>

Теперь я должен выполнить через набор данных, чтобы сделать следующее -
1. ЧИТАЕТ НОМЕР СЧЕТА Столбец (AccountNumber) Значение и обновление (я знаю, что набор данных неизменен. Таким образом, обновление набора данных означает создание копии набора данных с обновленными строками) его со значением токена из MapedTokens. Это может быть достигнуто с помощью Join или других операций, но я не трачу на это из -за 2 -й задачи.
2. Прочтите еще один столбец Blob Blob и найдите номер учетной записи и обновите его. Нессердечный код. Большинство онлайн -ресурсов находятся в Scala, а не на Java. Пожалуйста, помогите. < /P>

spark 2.1
java 8 < /p>

rate1 - не удастся проверить Из -за ошибки сериализации. < /strong> < /p>

Dataset output = sparkSession.sqlContext().createDataFrame(dsRecords.javaRDD().map(row -> {
return RowFactory.create(row.get(0), row.get(1), row.get(2), swapToken(row.get(3)),row.get(4));
}), dsRecords.schema());

return output;

String swapToken(Object inputToken) {
return mappedTokens.get(inputToken);//mappedToken will have to be instance field.
}
< /code>

подход2- Неполный. < /strong> < /p>

dsRecords.foreach((ForeachFunction) row -> {
Integer index = row.fieldIndex("accountNumber");
String pan = row.getString(index);
String swap = this.swapToken(pan);
//TODO: create a dataset with rows from dsRecords but swap value.

});
< /code>

подход 3 - используйте UDF с функцией карты < /strong> < /p>

Создать UDF2 (что принимает 2 входных параметров, а именно. Кажется, что UDF может принимать только значения столбцов < /p>

update 1 - udf < /strong>
Итак, я реализовал UDF (AFK, опубликую код позже):
1. Определил UDF1 «UpdateTetoken», чтобы передать значение столбца XML и вернуть обновленное значение XML.
2. Экземпляр HashMap «MapenTokens», который имеет сопоставление пары с точки зрения аккаунта, сделано статичным. Доступ к моей функции UDF, чтобы найти учетную запись в строке XML и обновить с помощью токена. < /p>

Я мог бы проверить свою функцию ApplyswAppApdTokens, которая вызывает вышеуказанный UDF в наборе данных ‘withColumn». Однако, когда я запускаю программу Spark, я вижу, что «MapedToken» имеет «нулевые» данные, и, следовательно, столбец XML обновляется пустыми данными. Я думаю, что статическое «отображаемыетокенс» находится либо в другом JVM, либо в драйвере (даже в локации Spark создает изолированный драйвер, исполнитель). Обидно, что нет простого решения для итерации и обновления строк в Spark. < /P>

Dataset processByRow(Dataset dsRecords, SparkSession sparkSession) {
sparkSession.udf().register("updateToken", updateToken, DataTypes.StringType);
return ds = dsRecords.withColumn("eventRecordTokenText", callUDF("updateToken", dsRecords.col("eventRecordTokenText")));
}

static UDF1 updateToken = new UDF1() {
public String call(final String tokenText) throws Exception {
// xml operations here..
for (int nodeIndex = 0; nodeIndex < nList.getLength(); nodeIndex++) {
Node thisNode = nList.item(nodeIndex);
if (thisNode.getAttributes().getNamedItem("ProcessTokenValue") != null && thisNode.getAttributes()
.getNamedItem("ProcessTokenValue").getNodeValue().equalsIgnoreCase("true")) {
Node valueNode = thisNode.getAttributes().getNamedItem("Value");
String thisToken = valueNode.getNodeValue();
String newToken = mappedTokens.get(thisToken); // *returns null values from the map*
if(newToken != null && !newToken.isEmpty())
valueNode.setNodeValue(newToken);
}
}
// more xml operations here..
return output;
}
};
< /code>

Обновление 2 - Итатерация и обновление < /strong>
Теперь я пробую строку, пройдя строк .. < /p>

Dataset processByRow1(Dataset dsRecords, SparkSession sparkSession) {
List newRows = new ArrayList();
dsRecords.foreach((ForeachFunction) record -> {
String currentToken = record.getAs(AppConstants.TokenCol);
String newToken = mappedTokens.get(currentToken);
newRows.add(new MongoRecordSmall(record.getString(0), record.getString(1), newToken, record.getString(3)));
logger.error(“Size plus=“+newRows.size());
});
return sparkSession.createDataFrame(newRows, MongoRecordSmall.class);
}
< /code>

Это выбрасывает ошибку Serliazation. Похоже (https://databricks.gitbooks.io/databric ... ption.html) мой класс, где существует выше .

Подробнее здесь: https://stackoverflow.com/questions/493 ... ue-in-java

Вернуться в «JAVA»