Hhanwen 2020-05-08
//spark读取数据
Dataset<Row> df = spark.read().textFile(currentSrcPath, 1);
Dataset<Row> df = spark.read().json(path);
Dataset<Row> df = spark.read().orc(path);
Dataset<Row> parquet = spark.read().parquet(path);
//spark写入数据
df.write().mode("overwrite").text(outputPath);
df.write().mode("overwrite").parquet(outputPath);
df.write().mode("overwrite").orc(outputPath);
//rdd转Dataset<Row>
Dataset<Row> df = spark.createDataFrame(rowRDD, AdjustSchema.row);
//list转Dataset
Dataset<String> dataset = spark.createDataset(Collections.singletonList(Long.toString(startTime)), Encoders.STRING());//从spark获取hadoop FileSystem FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
//构建schema
public static StructType row = DataTypes.createStructType(
Arrays.asList(
DataTypes.createStructField("phone_name", StringType, true),
DataTypes.createStructField("app_id", StringType, true)
...
));//rdd/javaRDD转DataFrame(Dataset<Row>)
Dataset<Row> personDF = spark.createDataFrame(personRDD, Person.class);
spark.createDataFrame(personRDD, PersonSchema);
personDF = spark.createDataFrame(personJavaRDD, Person.class);
//rdd转Dataset
Encoder<Person> personEncoder = Encoders.bean(Person.class);
personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);
//list直接构建Dataset
Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);
//JavaRDD<Row>转Dataset<Row>
JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);
//Dataset<Person> -> JavaRDD<Person>
personJavaRDD = personDS.toJavaRDD();
//Dataset<Row> -> JavaRDD<Person>
personJavaRDD = personDF.toJavaRDD().map(row -> {
String name = row.getAs("name");
int age = row.getAs("age");
return new Person(name, age);
});
//Dataset<Person> -> Dataset<Row>
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
Dataset<Row> personDF_fromDS = personDS.map(
(MapFunction<Person, Row>) person -> {
List<Object> objectList = new ArrayList<>();
objectList.add(person.name);
objectList.add(person.age);
return RowFactory.create(objectList.toArray());
},
rowEncoder
);
//Dataset<Row> -> Dataset<Person>
personDS = personDF.map(new MapFunction<Row, Person>() {
@Override
public Person call(Row value) throws Exception {
return new Person(value.getAs("name"), value.getAs("age"));
}
}, personEncoder);