运行源码 我们将运行1.13.0版本的Flink,其scala环境为2.12
Step1. 获取学习项目 1 git clone  https://github.com/fightinggg/flink-src-study.git --recursive 
 
在这个项目中,笔者把flink源码作为了一个git submodule放置于文件夹flink中,用来临时查看,当然我个人不建议看这些代码,因为这个文件夹太大了,IDE都不能很好的处理他。
然后就可以直接运行了
 
Step2. Enjoy It 现在你可以直接从这里进入flink的控制台http://localhost:8081 , 你也可以直接在ideal中调试flink。
Step3. Debug 自己设断点就好了。
使用Flink 下面使用flink-examples-streaming_2.12-1.13.0来演示
执行环境 这个包下的所有的example在main函数的第一行全部首先选择获取环境,代码如下。
1 env = StreamExecutionEnvironment.getExecutionEnvironment(); 
 
数据源 当我们获取运行环境以后,紧接着就需要拿到数据源,examples中的各个例子获取数据源的方案如下。
方案 
example 
 
 
从数组获取 
1. WordCount 2. WindowWordCount … 
 
从文件按行获取 
1. WordCount 2. WindowWordCount … 
 
从自定义Source获取 
1. TopSpeedWindowing 2. KafkaEventsGeneratorJob … 
 
从Kafka获取 
1. StateMachineExample … 
 
从Socket获取 
1. SocketWindowWordCount … 
 
从集合获取 
1. WindowJoin … 
 
算子 第一个问题就是:什么是算子?
算子描述了一系列的计算操作,他告诉计算机一个数据应该如何处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 graph LR     %% style   classDef green fill:#a3e4d7,stroke:#333,stroke-width:1px   classDef blue fill:#d6eaf8,stroke:#333,stroke-width:1px   classDef brown fill:#edbb99,stroke:#333,stroke-width:1px   classDef grey fill:#f2f3f4,stroke:#333,stroke-width:1px      %% point   start((数据源)):::green   op1(算子1):::blue   op2(算子1):::blue   op3(算子1):::blue   op4(算子2):::blue   op5(算子2):::blue   op6(算子2):::blue   output((输出)):::brown   shuffle((shuffle)):::grey      %% edge   start --> op1 & op2 & op3 --- shuffle --> op4 & op5 & op6 --> output    
 
一旦我们有了数据源以后,数据源源源不断的产生数据,我们可以把它当作一个流,可以进行计算了,DataStream被flatMap以后是SingleOutputStreamOperator,实际上这个类和DataSream区别并不是特别大,SingleOutputStreamOperator继承自DataStream且没有重写任何函数。
KeyedStream则提供了一些聚合函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 graph LR   %% style   classDef green fill:#a3e4d7,stroke:#333,stroke-width:1px      %% point   DataStream(DataStream<br/>数据源):::green   SingleOutputStreamOperator(SingleOutputStreamOperator<br/>简单的输出流):::green   KeyedStream(KeyedStream<br/>被Key分组的流):::green      %% edge   DataStream -->|flatMap| SingleOutputStreamOperator   DataStream -->|keyBy| KeyedStream    
 
更具体一点,如wordCount,他经过flatMap分词,然后使用词进行Key,最后聚合,代码如下。
1 2 3 4 5 6 DataStream<Tuple2<String, Integer>> counts =          text.flatMap(new  Tokenizer ())          .keyBy(value -> value.f0)     .sum(1 ); 
 
窗口 当然复杂一点点的如WindowWordCount,中间穿插了一个计数窗口,代码如下。
1 2 3 4 5 6 7 8 DataStream<Tuple2<String, Integer>> counts =          text.flatMap(new  WordCount .Tokenizer())          .keyBy(value -> value.f0)     .countWindow(windowSize, slideSize)          .sum(1 ); 
 
Socket数据源 最复杂的SocketWindowWordCount,首先执行nc -l 12345,然后启动此类的main函数,nc可以直接输入,我们能发每5秒输出了一次实时计算结果,代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 DataStream<WordWithCount> windowCounts =     text.flatMap(     new  FlatMapFunction <String, WordWithCount>() {         @Override          public  void  flatMap (              String value, Collector<WordWithCount> out)  {            for  (String word : value.split("\\s" )) {                 out.collect(new  WordWithCount (word, 1L ));             }         }     })     .keyBy(value -> value.word)     .window(TumblingProcessingTimeWindows.of(Time.seconds(5 )))     .reduce(     new  ReduceFunction <WordWithCount>() {         @Override          public  WordWithCount reduce (WordWithCount a, WordWithCount b)  {             return  new  WordWithCount (a.word, a.count + b.count);         }     }); 
 
异步数据源 首先提出一个背景,有一条来自Kafka的数据,由于某些原因,该数据中暂时不包含完整的字段,当我们使用Flink从Kafka读取数据以后,还需要查询Mysql补全其字段,此后才能使用Flink做接下来的操作。
如果自定义数据源,从Kafka消费数据,然后查询Mysql,最后输出,这其中其实涉及到很多问题,一个最简单的想法是从Kafka单线程消费数据,然后阻塞住,去查询Mysql。
1 2 3 4 5 6 7 8 9 10 11 sequenceDiagram   rect rgb(0, 0, 255, .1)       loop            DataStream ->> + Kafka : 拉取数据           Kafka --) - DataStream : 返回数据           DataStream ->> + Mysql : 查询未知字段           Mysql --) - DataStream : 返回未知字段           DataStream ->> + out : 产生一条数据           out --) - DataStream : 数据生产成功       end   end 
 
这样做无可厚非,但是效率堪忧,Kafka拉取数据要快于Mysql,所以补全字段以及输出结果可以异步完成,基于第二个点,引入了RichAsyncFunction。
1 2 3 4 5 6 7 8 9 10 sequenceDiagram   rect rgb(0, 0, 255, .1)       loop            DataStream ->> + Kafka : 拉取数据           Kafka --) - DataStream : 返回数据           DataStream -) + 线程池 : 后续工作委托给线程池        end   end   线程池 ->> - 线程池 : 完成剩下的工作    
 
当然RichAsyncFunction做的工作不仅仅是这些,实际上处理流程也比这个复杂很多,这里从中挑几个出来聊一聊。
首先是顺序问题,由于后续工作委托给了线程池,线程池内部当然可以并发执行,那么我们就没办法保证有哪些数据先处理完毕,Async I/O 给出的第一个解决方案是通过队列保证顺序,哪个任务先执行完我不管,最终按入队顺序取结果;第二个解决方案是完全不理会顺序,谁先执行完就取出谁的结果;第三个解决方案是关注watermark,对于当前watermark下的数据,执行完就直接取出结果,对于下一个watermark的数据,将其缓存,直到他的watermark抵达。读者可以在这里看到更加详细的过程http://wuchong.me/blog/2017/05/17/flink-internals-async-io/ 
GPU计算 MatrixVectorMul是一个GPU计算例子,其中主要的内容在Multiplier中。这里 不做过多介绍。
Iterator模型 试想,如果有一些元素需要进行迭代计算,比如说我们计算两个元素进行斐波拉契数列的第n项,是不是可以写一个递归?
1 2 3 int  fib (int  a,int  b,int  n)  {    return  n<=0  ? b : fib (b,a+b,n-1 ); } 
 
如果某些算子也需要进行这些操作,我们可以使用ProcessFunction, 下面这个代码和上面的代码的思想异曲同工。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public  void  processElement (     Tuple5<Integer, Integer, Integer, Integer, Integer> value,     Context ctx,     Collector<Tuple5<Integer, Integer, Integer, Integer, Integer>> out)     throws  Exception {     Tuple5<Integer, Integer, Integer, Integer, Integer> element =         new  Tuple5 <>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);     if  (value.f2 < BOUND && value.f3 < BOUND) {         ctx.output(ITERATE_TAG, element);     } else  {         out.collect(element);     } } 
 
WindowJoin模型 参考SQL语法中的Join操作,两个stream将按照指定的key进行聚合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  static  DataStream<Tuple3<String, Integer, Integer>> runWindowJoin (     DataStream<Tuple2<String, Integer>> grades,     DataStream<Tuple2<String, Integer>> salaries,     long  windowSize)  {    return  grades.join(salaries)         .where(new  NameKeySelector ())         .equalTo(new  NameKeySelector ())         .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))         .apply(         new  JoinFunction <         Tuple2<String, Integer>,         Tuple2<String, Integer>,         Tuple3<String, Integer, Integer>>() {             @Override              public  Tuple3<String, Integer, Integer> join (                  Tuple2<String, Integer> first, Tuple2<String, Integer> second)  {                return  new  Tuple3 <String, Integer, Integer>(                     first.f0, first.f1, second.f1);             }         }); }