yuchuanchen 2020-06-16
1. flink简介
1.1 什么是flink
Apache Flink是一个分布式大数据处理引擎,可以对有限数据流(如离线数据)和无限流数据及逆行有状态计算(不太懂)。可以部署在各种集群环境,对各种大小的数据规模进行快速计算。

1.2 flink的架构体系
具体见文档
2. flink的安装
jobmanager.rpc.address: feng05 // 注意,此处冒号后需要空一格,并且参数要顶行写(yaml文件格式的规定,否则会报错) taskmanager.numberOfTaskSlots: 2
for i in {4..7}; do scp -r flink-1.10.1/ feng05:$PWD; donebin/start-cluster.sh
StandaloneSessionClusterEntrypoint (JobManager,即Master) TaskManagerRunner (TaskManager,即Worker)
feng05:8081
3. flink提交任务的两种方式
第一种:通过web页面提交


第二种:通过命令行提交
./flink run -m feng05:8081 -p 4 -c cn._51doit.flink.day1.HelloFlink /root/flink-in-action-1.0-SNAPSHOT.jar --hostname feng05 --port 8888
4.flink快速入门
4.0 创建flink工程
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DgroupId=cn._51doit.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false
同理
4.1 wordCount案例
StreamWordCount(匿名内部类的形式)
package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 创建一个Stream计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 调用Source创建DataStream
DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));
int parallelism = lines.getParallelism();
// DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String line, Collector<String> out) throws Exception {
// String[] words = line.split(" ");
// for (String word : words) {
// out.collect(word);
// }
// }
// });
// SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
// @Override
// public Tuple2<String, Integer> map(String word) throws Exception {
// return Tuple2.of(word, 1);
// }
// });
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//Transformation 结束
//调用Sink
summed.print();
//执行程序
env.execute("StreamWordCount");
}
}LambdaStreamWordCount(lambda的形式)
package cn._51doit.flink.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class LambdaStreamWordCount {
public static void main(String[] args) throws Exception {
// 创建一个stream计算的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
// SingleOutputStreamOperator<String> words = lines
// .flatMap((String line, Collector<String> out) -> Arrays.asList(line.split(" ")).forEach(out::collect))
// .returns(Types.STRING);
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words
// .map(word -> Tuple2.of(word, 1))
// .returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out)->{
Arrays.stream(line.split(" ")).forEach(w -> out.collect(Tuple2.of(w, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.print();
env.execute();
}
}自己运行时遇到的小问题

5.source
只有一个source来产生数据,如fromCollection、socketTextStream
有多个source实例来产生数据
6 常用算子
6.1 keyBy

package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByDemo1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//辽宁省,沈阳市,1000
SingleOutputStreamOperator<Tuple3<String, String, Double>> provinceCityAndMoney = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String line) throws Exception {
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return Tuple3.of(province, city, money);
}
});
KeyedStream<Tuple3<String, String, Double>, Tuple> keyed = provinceCityAndMoney.keyBy(0, 1);
SingleOutputStreamOperator<Tuple3<String, String, Double>> summed = keyed.sum(2);
summed.print();
env.execute();
}
}package cn._51doit.flink.day01;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByDemo2 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//辽宁省,沈阳市,1000
SingleOutputStreamOperator<OrderBean> provinceCityAndMoney = lines.map(new MapFunction<String, OrderBean>() {
@Override
public OrderBean map(String line) throws Exception {
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return new OrderBean(province, city, money);
}
});
KeyedStream<OrderBean, Tuple> keyed = provinceCityAndMoney.keyBy("province", "city");
SingleOutputStreamOperator<OrderBean> res = keyed.sum("money");
//provinceCityAndMoney.keyBy(OrderBean::getProvince) 只能按照一个字段分组
res.print();
env.execute();
}
}6.2 max和min
min、max返回分组的字段和参与比较的数据,如果有多个字段,其他字段的返回值是第一次出现的数据。
package cn._51doit.flink.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MinMaxDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//省份,城市,人数
SingleOutputStreamOperator<Tuple3<String, String, Integer>> provinceCityAmount = lines.map(line -> {
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
Integer amount = Integer.parseInt(fields[2]);
return Tuple3.of(province, city, amount);
}).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT));
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyed = provinceCityAmount.keyBy(0);
//min、max返回分组的字段和参与比较的数据,如果有多个字段,其他字段的返回值是第一次出现的数据。
SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = keyed.max(2);
max.print();
env.execute();
}
}比如
江西,鹰潭,1000 //先输入此数据,max后得到本身 江西,南昌,2000 //输入该数据,max后得到的是江西,鹰潭,2000 并不能得到南昌字段
解决办法=====>使用maxBy和minBy
6.2 maxBy和minBy
package cn._51doit.flink.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MinByMaxByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//省份,城市,人数
江西,鹰潭,1000
江西,南昌,2000
SingleOutputStreamOperator<Tuple3<String, String, Integer>> provinceCityAmount = lines.map(line -> {
String[] fields = line.split(",");
String province = fields[0];
String city = fields[1];
Integer amount = Integer.parseInt(fields[2]);
return Tuple3.of(province, city, amount);
}).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT));
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyed = provinceCityAmount.keyBy(0);
//minBy、maxBy返回最大值或最小值数据本身(全部字段都返回)。
SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = keyed.maxBy(2);
max.print();
env.execute();
}
}这种形式又会出现另外一个难点,就是当按照key进行分组后,比较大小的值一样时,其它字段返回的值又是第一次出现的数据,解决办法===>加一个参数(可以从源码中得出此结论),如下:加上false
SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = keyed.maxBy(2, false);
此时其它字段返回的值就是最后依次出现的字段了。