Flink学习(三) 批流版本的wordcount Scala版本

yuchuanchen 2020-05-11

批处理代码:

package com.wyh.wc

import org.apache.flink.api.scala._

/**
  * 批处理代码
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建一个批处理的一个环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputPath = "D:\\shujia\\shujia006\\FlinkWyh\\src\\main\\data\\word"

    val inputDataSet = env.readTextFile(inputPath)

    //分词之后做count
    val wordcountSet = inputDataSet
      .flatMap(lines => lines.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    //打印
    wordcountSet.map(x => {
      x._1 + " " + x._2
    }).print()


  }

}

流处理代码:

package com.wyh.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //为了host和port不写死,flink提供了一个方法
    val params = ParameterTool.fromArgs(args)

//    val host = params.get("host")
//
//    val port = params.getInt("port")

    //env.disableOperatorChaining()//全局打散  一个算子一个任务
    //每一个算子也会有个方法  .disableChaining() 将这个算子单独拿出来
    //还有个方法.startNewChain() 将当前算子之前面和后面 分开

    //部署到集群中接收socket数据流
//    val dataStream: DataStream[String] = env.socketTextStream(host, port)

    //接收socket数据流
    val dataStream = env.socketTextStream("localhost", 9999)

    //逐一读取数据,打散进行WordCount
    val wordCountStream = dataStream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordCountStream.print().setParallelism(1)


    //比批处理多一个步骤
    //真正执行这个任务,启动它的Executor
    env.execute("WordCountStream")


  }

}

相关推荐

登峰小蚁 / 0评论 2020-01-10