登峰小蚁 2020-04-23
//练习sparkstreaming监听socket端口
//手写wordcount java代码
package com.swust.streaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class TestSparkStreaming { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("stream"); JavaSparkContext jsc = new JavaSparkContext(conf); // jsc.setLogLevel("error"); JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(5000)); //监听端口 JavaReceiverInputDStream<String> lines = ssc.socketTextStream("data005", 9999); // word count JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { String[] splits = s.split(" "); return Arrays.asList(splits).iterator(); } }); JavaPairDStream<String, Integer> wordRdd = (JavaPairDStream<String, Integer>) words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { String key = word; int value = 1; Tuple2<String, Integer> tp = new Tuple2<>(key, value); return tp; } }); JavaPairDStream<String, Integer> resultRdd = wordRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); resultRdd.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() { @Override public void call(JavaPairRDD<String, Integer> pairRDD, Time time) throws Exception { pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tp) throws Exception { System.out.println(tp._1+"-----------------"+tp._2); } }); } }); ssc.start(); try { ssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } ssc.stop(false); } }