Как конвертировать javaRDD в набор данныхJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как конвертировать javaRDD в набор данных

Сообщение Anonymous »

Я пытаюсь использовать Spark, считываю данные из базы данных Oracle в набор данных, затем преобразую набор данных в javaRDD для работы с картой, мой код может хранить только набор данных, который показывают официальные документы Spark: http://spark.apache.org/docs/latest/sql ... reflection



// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);


Наши данные читаются из Oracle. Как определить Person.class для хранения и преобразования rdd в набор данных? Или как с помощью Java напрямую выполнять операцию с картой для набора данных?
Что мне делать, если мой код такой?

import java.math.BigDecimal;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class FlatMapTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.sql.warehouse.dir", "./spark-warehouse");
SparkSession spark = SparkSession.builder().master("local[3]").config(conf)
.appName("Java Spark SQL data sources example").getOrCreate();

jdbcDataSource(spark);

}

public static void jdbcDataSource(SparkSession spark) {
// 连接到数据库,获得DF对象,DF对象封装了数据库的表信息数据
Dataset jdbcDF = spark.read().format("jdbc")
.option("driver", "oracle.jdbc.driver.OracleDriver")
.option("url", "jdbc:oracle:thin:@192.168.101.207:1521:orcl")
.option("dbtable", "datamask")
.option("user", "scott")
.option("password", "tiger").load();
/*
* 创建临时表 datamask
*/
jdbcDF.createOrReplaceTempView("datamask");
Dataset sqlDF = spark.sql("select * from datamask");

JavaRDD resultRDD = sqlDF.toJavaRDD().map(
new Function() {

public String call(Row row) throws Exception {
Random ran = new Random();
int r = ran.nextInt(9001) + 1000;
/*
* 将每个字段的后4位替换为随机数,替换规则可以自己设定
*/
String userName = row.getAs("USER_NAME");
userName = StringUtils.replace(userName,
StringUtils.right(userName, 4), "" + r);

String loginName = row.getAs("LOGIN_NAME");// LOGIN_NAME为数据库字段名,下同
loginName = StringUtils.replace(loginName,
StringUtils.right(loginName, 4), "" + r);
String countyCode = row.getAs("COUNTY_CODE");
countyCode = StringUtils.replace(countyCode,
StringUtils.right(countyCode, 4), "" + r);
String passwd = row.getAs("PASSWORD");
passwd = StringUtils.replace(passwd,
StringUtils.right(passwd, 4), "" + r);
String areaId = row.getAs("AREA_ID");
areaId = StringUtils.replace(areaId,
StringUtils.right(areaId, 4), "" + r);
String cityNo = row.getAs("CITY_NO");
cityNo = StringUtils.replace(cityNo,
StringUtils.right(cityNo, 4), "" + r);
String cardID = row.getAs("CARD_ID");
cardID = StringUtils.replace(cardID,
StringUtils.right(cardID, 4), "" + r);
String mobile = row.getAs("MOBILE");
mobile = StringUtils.replace(mobile,
StringUtils.right(mobile, 4), "" + r);
String email = row.getAs("EMAIL");
email = StringUtils.replace(email,
StringUtils.right(email, 4), "" + r);
BigDecimal big = row.getAs("QQ");
String qq = big.toString();
qq = StringUtils.replace(qq, StringUtils.right(qq, 4),
"" + r);
String addr = row.getAs("ADDR");
addr = StringUtils.replace(addr,
StringUtils.right(addr, 4), "" + r);
String birthday = row.getAs("BIRTHDAY");
birthday = StringUtils.replace(birthday,
StringUtils.right(birthday, 4), "" + r);
String birthday1 = row.getAs("BIRTHDAY1");
birthday1 = StringUtils.replace(birthday1,
StringUtils.right(birthday1, 4), "" + r);
String codeId = row.getAs("CODE_ID");
codeId = StringUtils.replace(codeId,
StringUtils.right(codeId, 4), "" + r);
String deptNo = row.getAs("DEPT_NO");
deptNo = StringUtils.replace(deptNo,
StringUtils.right(deptNo, 4), "" + r);
String newCode = row.getAs("NEW_CODE");
newCode = StringUtils.replace(newCode,
StringUtils.right(newCode, 4), "" + r);
String oldCode = row.getAs("OLD_CODE");
oldCode = StringUtils.replace(oldCode,
StringUtils.right(oldCode, 4), "" + r);
return loginName + "," + countyCode + "," + passwd
+ "," + areaId + "," + cityNo + "," + cardID
+ "," + mobile + "," + email + "," + qq + ","
+ addr + "," + birthday + "," + "," + birthday1
+ "," + codeId + "," + deptNo + "," + newCode
+ "," + oldCode;
}

});
Dataset peopleDF = spark.createDataFrame(resultRDD, Object.class);

String url2 = "jdbc:oracle:thin:@192.168.101.207:1521:orcl";
Properties connectionProperties2 = new Properties();
connectionProperties2.setProperty("user", "scott");// 设置用户名
connectionProperties2.setProperty("password", "tiger");// 设置密码
String table2 = "masked1";

peopleDF.write().mode(SaveMode.Append)
.jdbc(url2, table2, connectionProperties2);

}
}


Подробнее здесь: https://stackoverflow.com/questions/415 ... to-dataset
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»