Hhanwen 2020-05-08
//spark读取数据 Dataset<Row> df = spark.read().textFile(currentSrcPath, 1); Dataset<Row> df = spark.read().json(path); Dataset<Row> df = spark.read().orc(path); Dataset<Row> parquet = spark.read().parquet(path); //spark写入数据 df.write().mode("overwrite").text(outputPath); df.write().mode("overwrite").parquet(outputPath); df.write().mode("overwrite").orc(outputPath); //rdd转Dataset<Row> Dataset<Row> df = spark.createDataFrame(rowRDD, AdjustSchema.row); //list转Dataset Dataset<String> dataset = spark.createDataset(Collections.singletonList(Long.toString(startTime)), Encoders.STRING());
//从spark获取hadoop FileSystem FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
//构建schema public static StructType row = DataTypes.createStructType( Arrays.asList( DataTypes.createStructField("phone_name", StringType, true), DataTypes.createStructField("app_id", StringType, true) ... ));
//rdd/javaRDD转DataFrame(Dataset<Row>) Dataset<Row> personDF = spark.createDataFrame(personRDD, Person.class); spark.createDataFrame(personRDD, PersonSchema); personDF = spark.createDataFrame(personJavaRDD, Person.class); //rdd转Dataset Encoder<Person> personEncoder = Encoders.bean(Person.class); personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder); //list直接构建Dataset Dataset<Row> personDF = spark.createDataFrame(personList, Person.class); //JavaRDD<Row>转Dataset<Row> JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name)); personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema); //Dataset<Person> -> JavaRDD<Person> personJavaRDD = personDS.toJavaRDD(); //Dataset<Row> -> JavaRDD<Person> personJavaRDD = personDF.toJavaRDD().map(row -> { String name = row.getAs("name"); int age = row.getAs("age"); return new Person(name, age); }); //Dataset<Person> -> Dataset<Row> ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema); Dataset<Row> personDF_fromDS = personDS.map( (MapFunction<Person, Row>) person -> { List<Object> objectList = new ArrayList<>(); objectList.add(person.name); objectList.add(person.age); return RowFactory.create(objectList.toArray()); }, rowEncoder ); //Dataset<Row> -> Dataset<Person> personDS = personDF.map(new MapFunction<Row, Person>() { @Override public Person call(Row value) throws Exception { return new Person(value.getAs("name"), value.getAs("age")); } }, personEncoder);