Как перебрать набор данных Spark и обновить значение столбца в Java?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как перебрать набор данных Spark и обновить значение столбца в Java?

Сообщение Anonymous »

Я работаю над POC, где мне нужно обновить номер счета в БД с помощью токенов. Я читаю данные в набор данных dsRecords (около 2 миллионов записей). У меня есть еще одна процедура, которая захватила отдельные номера счетов и получила токены, сопоставление хранится в HashMap.

Код: Выделить всё

Dataset applySwappedTokens(Dataset dsRecords, Map mappedTokens){
}
Теперь мне нужно перебрать набор данных, чтобы сделать следующее:
1. Прочитайте значение столбца номера счета (accountNumber) и обновите его (я знаю, что набор данных неизменяем. Таким образом, обновление набора данных означает создание копии набора данных с обновленными строками) со значением токена из MappedTokens. Этого можно добиться с помощью JOIN или других операций, но я не трачу на это усилий из-за второй задачи.
2. Прочитайте другой столбец XML-объекта, найдите номер учетной записи и обновите его.

Все варианты, которые я пробовал до сих пор, приводят к ошибке времени компиляции или ошибке компиляции теста из-за несериализуемого кода. Большинство онлайн-ресурсов написаны на Scala, а не на Java. Пожалуйста, помогите.

Spark 2.1
Java 8

Подход 1 – не удалось протестировать из-за ошибки сериализации.

Код: Выделить всё

Dataset output = sparkSession.sqlContext().createDataFrame(dsRecords.javaRDD().map(row ->  {
return RowFactory.create(row.get(0), row.get(1), row.get(2), swapToken(row.get(3)),row.get(4));
}), dsRecords.schema());

return output;

String swapToken(Object inputToken) {
return mappedTokens.get(inputToken);//mappedToken will have to be instance field.
}
Подход 2 – неполный.

Код: Выделить всё

dsRecords.foreach((ForeachFunction) row -> {
Integer index = row.fieldIndex("accountNumber");
String pan = row.getString(index);
String swap = this.swapToken(pan);
//TODO: create a dataset with rows from dsRecords but swap value.

});
Подход 3. Используйте UDF с функцией карты

Создайте UDF2 (который принимает 2 входных параметра, а именно accountNumber и MappedToken, и возвращает токен). Кажется, UDF может принимать только значения столбцов

обновление 1 - UDF
Итак, я реализовал UDF (AFK, код опубликую позже):
1. Определен UDF1 «updateToken» для передачи значения столбца XML и возврата обновленного значения XML.
2. Экземпляр HashMap «mappedTokens», имеющий сопоставление пары учетная запись-токен, становится статическим. Доступ к нему осуществляется внутри моей функции UDF, чтобы найти учетную запись в строке xml и обновить ее с помощью токена.

Я мог бы протестировать свою функцию applySwappedTokens, которая вызывает указанную выше UDF для набора данных withColumn. Однако когда я запускаю программу Spark, я вижу, что «mappedToken» имеет «нулевые» данные, и, следовательно, столбец xml обновляется пустыми данными. Я думаю, что статические «mappedTokens» находятся либо в другой JVM, либо в драйвере (даже в локальном режиме искра создает изолированный драйвер-исполнитель). Разочаровывает отсутствие простого решения для перебора и обновления строк в Spark.

Код: Выделить всё

Dataset processByRow(Dataset dsRecords, SparkSession sparkSession) {
sparkSession.udf().register("updateToken", updateToken, DataTypes.StringType);
return ds = dsRecords.withColumn("eventRecordTokenText", callUDF("updateToken", dsRecords.col("eventRecordTokenText")));
}

static UDF1 updateToken = new UDF1() {
public String call(final String tokenText) throws Exception {
// xml operations here..
for (int nodeIndex = 0; nodeIndex < nList.getLength(); nodeIndex++) {
Node thisNode = nList.item(nodeIndex);
if (thisNode.getAttributes().getNamedItem("ProcessTokenValue") != null && thisNode.getAttributes()
.getNamedItem("ProcessTokenValue").getNodeValue().equalsIgnoreCase("true")) {
Node valueNode = thisNode.getAttributes().getNamedItem("Value");
String thisToken = valueNode.getNodeValue();
String newToken = mappedTokens.get(thisToken); // *returns null values from the map*
if(newToken != null && !newToken.isEmpty())
valueNode.setNodeValue(newToken);
}
}
// more xml operations here..
return output;
}
};
обновление 2 — итерация и обновление
Теперь я пробую обход по строкам..

Код: Выделить всё

Dataset processByRow1(Dataset dsRecords, SparkSession sparkSession) {
List newRows = new ArrayList();
dsRecords.foreach((ForeachFunction) record -> {
String currentToken = record.getAs(AppConstants.TokenCol);
String newToken = mappedTokens.get(currentToken);
newRows.add(new MongoRecordSmall(record.getString(0), record.getString(1), newToken, record.getString(3)));
logger.error(“Size plus=“+newRows.size());
});
return sparkSession.createDataFrame(newRows, MongoRecordSmall.class);
}
Выдает ошибку серлиации. Кажется (https://databricks.gitbooks.io/databric ... ption.html), мой класс, в котором существует вышеуказанная логика, серлализуется и отправляется на рабочие узлы, но при этом не получается.

Подробнее здесь: https://stackoverflow.com/questions/493 ... ue-in-java
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»