Flink原理与实践-DataStream-API的介绍和使用课件.pptx
- 【下载声明】
1. 本站全部试题类文档,若标题没写含答案,则无答案;标题注明含答案的文档,主观题也可能无答案。请谨慎下单,一旦售出,不予退换。
2. 本站全部PPT文档均不含视频和音频,PPT中出现的音频或视频标识(或文字)仅表示流程,实际无音频或视频文件。请谨慎下单,一旦售出,不予退换。
3. 本页资料《Flink原理与实践-DataStream-API的介绍和使用课件.pptx》由用户(晟晟文业)主动上传,其收益全归该用户。163文库仅提供信息存储空间,仅对该用户上传内容的表现方式做保护处理,对上传内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(点击联系客服),我们立即给予删除!
4. 请根据预览情况,自愿下载本文。本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
5. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007及以上版本和PDF阅读器,压缩文件请下载最新的WinRAR软件解压。
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Flink 原理 实践 DataStream API 介绍 使用 课件
- 资源描述:
-
1、第四章DataStream API的介绍和使用Flink程序的骨架结构1.初始化运行环境2.读取一到多个Source数据源3.根据业务逻辑对数据流进行Transformation转换4.将结果输出到Sink5.调用作业执行函数执行环境是作业与集群交互的入口设置并行度关闭算子链时间、Checkpoint流处理和批处理的执行环境不一样Java、Scala两套API设置执行环境/创建Flink执行环境 StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParalleli
2、sm(2);env.disableOperatorChaining();Source、Transformation和SinkSource读取数据源统称为Source文件系统、消息队列、数据库等Transformation使用Flink提供的各类函数,进行有状态的计算数据流的分组、窗口和聚合操作等Sink将计算结果输出到外部系统,统称为Sink目的地可以是文件系统、消息队列、数据库等Flink是延迟执行(Lazy Evaluation)的调用execute()方法,Flink才会真正执行否则无法得到计算结果字符串参数为当前作业名执行/executeenv.execute(kafka stream
3、ing word count);l 单数据流转换l 基于Key的分组转换l 多数据流转换l 数据重分布转换l DataStream 泛型T为数据流中每个元素的类型四类Tranformation转换l 每个输入元素对应一个输出元素l 重写MapFunction或RichMapFunctionl MapFunction T为输入类型O为输出类型l 实现其中的map()虚方法l 主逻辑中调用该函数单数据流转换-mapFunctionalInterface public interface MapFunction extends Function,Serializable /调用这个API就是继承并实
4、现这个虚函数 O map(T value)throws Exception;/第一个泛型是输入类型,第二个泛型是输出类型 public static class DoubleMapFunction implements MapFunction Override public String map(Integer input)return function input:+input+,output:+(input*2);DataStream functionDataStream=dataStream.map(new DoubleMapFunction();MapFunction源代码一个MapF
5、unction的实现l 直接继承接口类并实现map虚方法上页所示l 使用匿名类l 使用Lambda表达式单数据流转换-map/匿名类 DataStream anonymousDataStream=dataStream.map(new MapFunction()Override public String map(Integer input)throws Exception return anonymous function input:+input+,output:+(input*2););/使用Lambda表达式 DataStream lambdaStream=dataStream.map(
6、input-lambda input:+input+,output:+(input*2);匿名类实现MapFunctionLambda表达式实现MapFunctionl 对输入元素进行过滤l 继承并实现FilterFunction或RichFilterFunctionl 重写filter虚方法l True 保留l False 过滤单数据流转换-filterDataStream dataStream=senv.fromElements(1,2,-3,0,5,-9,8);/使用-构造Lambda表达式 DataStream lambda=dataStream.filter(input-input
7、0);public static class MyFilterFunction extends RichFilterFunction /limit参数可以从外部传入 private Integer limit;public MyFilterFunction(Integer limit)this.limit=limit;Override public boolean filter(Integer input)return input this.limit;Lambda表达式实现FilterFunction实现FilterFunctionl 与map()相似l 输出零个、一个或多个元素l 可对列表
8、结果展平单数据流转换-flatMap苹果,梨,香蕉.map(去皮)去皮苹果,去皮梨,去皮香蕉 mapflatMap苹果,梨,香蕉.flatMap(切碎)苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1 苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1l 使用Lambda表达式l Collector用来收集元素flatMap()虚方法中不使用return返回数据,使用Collector收集返回数据Collector中的泛型String为返回数据类型l 将flatMap()看做map()和filter()更一般的形式map()和filter()的语义更明确单数据流转
9、换-flatMapDataStream dataStream=senv.fromElements(Hello World,Hello this is Flink);/split函数的输入为 Hello World 输出为 Hello 和 World 组成的列表 Hello,World/flatMap将列表中每个元素提取出来/最后输出为 Hello,World,Hello,this,is,Flink DataStream words=dataStream.flatMap(String input,Collector collector)-for(String word:input.split()
10、collector.collect(word);).returns(Types.STRING);l 数据分组后可进行聚合操作l keyBy()将一个DataStream转化为一个KeyedStreaml 聚合操作将KeyedStream转化为DataStreaml KeyedStream继承自DataStream基于Key的分组转换l 根据某种属性或数据的某些字段对数据进行分组l 对一个分组内的数据进行处理l 股票:相同股票代号的数据分组到一起l 相同Key的数据被分配到同一算子实例上l 需要指定Key数字位置字段名KeySelector基于Key的分组转换-keyByDataStreamTu
11、ple2 dataStream=senv.fromElements(Tuple2.of(1,1.0),Tuple2.of(2,3.2),Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5);/使用数字位置定义Key 按照第一个字段进行分组 DataStreamTuple2 keyedStream=dataStream.keyBy(0).sum(1);l KeySelectorl 重写getKey()方法单数据流转换-keyBy/IN为数据流元素,KEY为所选择的Key FunctionalInterface public interface
12、KeySelector extends Function,Serializable /选择一个字段作为Key KEY getKey(IN value)throws Exception;public class Word public String word;public int count;/使用KeySelector DataStream keySelectorStream=wordStream.keyBy(new KeySelector()Override public String getKey(Word in)return in.word;).sum(count);KeySelecto
13、r源码一个KeySelector的实现l sum()、max()、min()等l 指定字段,对该字段进行聚合KeySelectorl 流数据上的聚合实时不断输出到下游状态存储中间数据单数据流转换 Aggregationsl 将某个字段加和l 结果保存到该字段上l 不关心其他字段的计算结果单数据流转换 sumDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tuple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,
14、8);/按第一个字段分组,对第二个字段求和,打印出来的结果如下:/(0,0,0)/(0,1,0)/(0,3,0)/(1,0,6)/(1,1,6)/(1,1,6)DataStreamTuple3 sumStream=tupleStream.keyBy(0).sum(1);l max()对该字段求最大值结果保存到该字段上不保证其他字段的计算结果l maxBy()对该字段求最大值其他字段保留最大值元素的值单数据流转换 max/maxByDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tu
15、ple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,8);/按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下:/(0,0,0)/(0,0,1)/(0,0,2)/(1,0,6)/(1,0,7)/(1,0,8)DataStreamTuple3 maxStream=tupleStream.keyBy(0).max(2);/按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下:/(0,0,0)/(0,1,1)/(0,2,2)/(1,0,6)/(1,1,7)/(1,0,8)DataStreamTup
16、le3 maxByStream=tupleStream.keyBy(0).maxBy(2);l 比Aggregation更通用l 在KeyedStream上生效l 接受两个输入,生成一个输出l 两两合一地汇总操作基于Key的分组转换-reducel 实现ReduceFunction基于Key的分组转换-reducepublic static class MyReduceFunction implements ReduceFunction Override public Score reduce(Score s1,Score s2)return Score.of(s1.name,Sum,s1.s
17、core+s2.score);DataStream dataStream=senv.fromElements(Score.of(Li,English,90),Score.of(Wang,English,88),Score.of(Li,Math,85),Score.of(Wang,Math,92),Score.of(Liu,Math,91),Score.of(Liu,English,87);/实现ReduceFunction DataStream sumReduceFunctionStream=dataStream.keyBy(name).reduce(new MyReduceFunction(
18、);/使用 Lambda 表达式 DataStream sumLambdaStream=dataStream.keyBy(name).reduce(s1,s2)-Score.of(s1.name,Sum,s1.score+s2.score);l 将多个同类型的DataStream合并为一个DataStreaml 数据按照先进先出(FIFO)合并多数据流转换-unionDataStream shenzhenStockStream=.DataStream hongkongStockStream=.DataStream shanghaiStockStream=.DataStream unionSto
19、ckStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);l 只能连接两个DataStream数据流l 两个数据流类型可以不一致l 两个DataStream经过connect()之后转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态l 应用场景为:使用一个控制流对另一个数据流进行控制多数据流转换-connectl 重写CoMapFunction或CoFlatMapFunctionl 三个泛型,分别对应第一个输入流的数据类型
20、、第二个输入流的数据类型和输出流的数据类型l 对于CoFlatMapFunction,flatMap1()方法处理第一个流的数据,flatMap2()方法处理第二个流的数据l 可以做到类似SQL Join的效果多数据流转换-connect/IN1为第一个输入流的数据类型/IN2为第二个输入流的数据类型/OUT为输出类型 public interface CoFlatMapFunction extends Function,Serializable /处理第一个流的数据 void flatMap1(IN1 value,Collector out)throws Exception;/处理第二个流的
21、数据 void flatMap2(IN2 value,Collector out)throws Exception;/CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出 public static class MyCoMapFunction implements CoMapFunction Override public String map1(Integer input1)return input1.toString();Override public String map2(String input2)return input2;CoFlatMapF
22、unction源代码一个CoFlatMapFunction实现l 并行度逻辑视图中的算子被切分为多个算子子任务每个算子子任务处理一部分数据可以在整个作业的执行环境层面设置也可以对某个算子单独设置并行度StreamExecutionEnvironment senv=StreamExecutionEnvironment.getExecutionEnvironment();/获取当前执行环境的默认并行度 int defaultParalleism=senv.getParallelism();/设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4 senv.setParallelism(4);
展开阅读全文