书签 分享 收藏 举报 版权申诉 / 39
上传文档赚钱

类型Flink原理与实践-DataStream-API的介绍和使用课件.pptx

  • 上传人(卖家):晟晟文业
  • 文档编号:5218050
  • 上传时间:2023-02-17
  • 格式:PPTX
  • 页数:39
  • 大小:1.24MB
  • 【下载声明】
    1. 本站全部试题类文档,若标题没写含答案,则无答案;标题注明含答案的文档,主观题也可能无答案。请谨慎下单,一旦售出,不予退换。
    2. 本站全部PPT文档均不含视频和音频,PPT中出现的音频或视频标识(或文字)仅表示流程,实际无音频或视频文件。请谨慎下单,一旦售出,不予退换。
    3. 本页资料《Flink原理与实践-DataStream-API的介绍和使用课件.pptx》由用户(晟晟文业)主动上传,其收益全归该用户。163文库仅提供信息存储空间,仅对该用户上传内容的表现方式做保护处理,对上传内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(点击联系客服),我们立即给予删除!
    4. 请根据预览情况,自愿下载本文。本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
    5. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007及以上版本和PDF阅读器,压缩文件请下载最新的WinRAR软件解压。
    配套讲稿:

    如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。

    特殊限制:

    部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。

    关 键  词:
    Flink 原理 实践 DataStream API 介绍 使用 课件
    资源描述:

    1、第四章DataStream API的介绍和使用Flink程序的骨架结构1.初始化运行环境2.读取一到多个Source数据源3.根据业务逻辑对数据流进行Transformation转换4.将结果输出到Sink5.调用作业执行函数执行环境是作业与集群交互的入口设置并行度关闭算子链时间、Checkpoint流处理和批处理的执行环境不一样Java、Scala两套API设置执行环境/创建Flink执行环境 StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParalleli

    2、sm(2);env.disableOperatorChaining();Source、Transformation和SinkSource读取数据源统称为Source文件系统、消息队列、数据库等Transformation使用Flink提供的各类函数,进行有状态的计算数据流的分组、窗口和聚合操作等Sink将计算结果输出到外部系统,统称为Sink目的地可以是文件系统、消息队列、数据库等Flink是延迟执行(Lazy Evaluation)的调用execute()方法,Flink才会真正执行否则无法得到计算结果字符串参数为当前作业名执行/executeenv.execute(kafka stream

    3、ing word count);l 单数据流转换l 基于Key的分组转换l 多数据流转换l 数据重分布转换l DataStream 泛型T为数据流中每个元素的类型四类Tranformation转换l 每个输入元素对应一个输出元素l 重写MapFunction或RichMapFunctionl MapFunction T为输入类型O为输出类型l 实现其中的map()虚方法l 主逻辑中调用该函数单数据流转换-mapFunctionalInterface public interface MapFunction extends Function,Serializable /调用这个API就是继承并实

    4、现这个虚函数 O map(T value)throws Exception;/第一个泛型是输入类型,第二个泛型是输出类型 public static class DoubleMapFunction implements MapFunction Override public String map(Integer input)return function input:+input+,output:+(input*2);DataStream functionDataStream=dataStream.map(new DoubleMapFunction();MapFunction源代码一个MapF

    5、unction的实现l 直接继承接口类并实现map虚方法上页所示l 使用匿名类l 使用Lambda表达式单数据流转换-map/匿名类 DataStream anonymousDataStream=dataStream.map(new MapFunction()Override public String map(Integer input)throws Exception return anonymous function input:+input+,output:+(input*2););/使用Lambda表达式 DataStream lambdaStream=dataStream.map(

    6、input-lambda input:+input+,output:+(input*2);匿名类实现MapFunctionLambda表达式实现MapFunctionl 对输入元素进行过滤l 继承并实现FilterFunction或RichFilterFunctionl 重写filter虚方法l True 保留l False 过滤单数据流转换-filterDataStream dataStream=senv.fromElements(1,2,-3,0,5,-9,8);/使用-构造Lambda表达式 DataStream lambda=dataStream.filter(input-input

    7、0);public static class MyFilterFunction extends RichFilterFunction /limit参数可以从外部传入 private Integer limit;public MyFilterFunction(Integer limit)this.limit=limit;Override public boolean filter(Integer input)return input this.limit;Lambda表达式实现FilterFunction实现FilterFunctionl 与map()相似l 输出零个、一个或多个元素l 可对列表

    8、结果展平单数据流转换-flatMap苹果,梨,香蕉.map(去皮)去皮苹果,去皮梨,去皮香蕉 mapflatMap苹果,梨,香蕉.flatMap(切碎)苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1 苹果碎片1,苹果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1l 使用Lambda表达式l Collector用来收集元素flatMap()虚方法中不使用return返回数据,使用Collector收集返回数据Collector中的泛型String为返回数据类型l 将flatMap()看做map()和filter()更一般的形式map()和filter()的语义更明确单数据流转

    9、换-flatMapDataStream dataStream=senv.fromElements(Hello World,Hello this is Flink);/split函数的输入为 Hello World 输出为 Hello 和 World 组成的列表 Hello,World/flatMap将列表中每个元素提取出来/最后输出为 Hello,World,Hello,this,is,Flink DataStream words=dataStream.flatMap(String input,Collector collector)-for(String word:input.split()

    10、collector.collect(word);).returns(Types.STRING);l 数据分组后可进行聚合操作l keyBy()将一个DataStream转化为一个KeyedStreaml 聚合操作将KeyedStream转化为DataStreaml KeyedStream继承自DataStream基于Key的分组转换l 根据某种属性或数据的某些字段对数据进行分组l 对一个分组内的数据进行处理l 股票:相同股票代号的数据分组到一起l 相同Key的数据被分配到同一算子实例上l 需要指定Key数字位置字段名KeySelector基于Key的分组转换-keyByDataStreamTu

    11、ple2 dataStream=senv.fromElements(Tuple2.of(1,1.0),Tuple2.of(2,3.2),Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5);/使用数字位置定义Key 按照第一个字段进行分组 DataStreamTuple2 keyedStream=dataStream.keyBy(0).sum(1);l KeySelectorl 重写getKey()方法单数据流转换-keyBy/IN为数据流元素,KEY为所选择的Key FunctionalInterface public interface

    12、KeySelector extends Function,Serializable /选择一个字段作为Key KEY getKey(IN value)throws Exception;public class Word public String word;public int count;/使用KeySelector DataStream keySelectorStream=wordStream.keyBy(new KeySelector()Override public String getKey(Word in)return in.word;).sum(count);KeySelecto

    13、r源码一个KeySelector的实现l sum()、max()、min()等l 指定字段,对该字段进行聚合KeySelectorl 流数据上的聚合实时不断输出到下游状态存储中间数据单数据流转换 Aggregationsl 将某个字段加和l 结果保存到该字段上l 不关心其他字段的计算结果单数据流转换 sumDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tuple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,

    14、8);/按第一个字段分组,对第二个字段求和,打印出来的结果如下:/(0,0,0)/(0,1,0)/(0,3,0)/(1,0,6)/(1,1,6)/(1,1,6)DataStreamTuple3 sumStream=tupleStream.keyBy(0).sum(1);l max()对该字段求最大值结果保存到该字段上不保证其他字段的计算结果l maxBy()对该字段求最大值其他字段保留最大值元素的值单数据流转换 max/maxByDataStreamTuple3 tupleStream=senv.fromElements(Tuple3.of(0,0,0),Tuple3.of(0,1,1),Tu

    15、ple3.of(0,2,2),Tuple3.of(1,0,6),Tuple3.of(1,1,7),Tuple3.of(1,0,8);/按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下:/(0,0,0)/(0,0,1)/(0,0,2)/(1,0,6)/(1,0,7)/(1,0,8)DataStreamTuple3 maxStream=tupleStream.keyBy(0).max(2);/按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下:/(0,0,0)/(0,1,1)/(0,2,2)/(1,0,6)/(1,1,7)/(1,0,8)DataStreamTup

    16、le3 maxByStream=tupleStream.keyBy(0).maxBy(2);l 比Aggregation更通用l 在KeyedStream上生效l 接受两个输入,生成一个输出l 两两合一地汇总操作基于Key的分组转换-reducel 实现ReduceFunction基于Key的分组转换-reducepublic static class MyReduceFunction implements ReduceFunction Override public Score reduce(Score s1,Score s2)return Score.of(s1.name,Sum,s1.s

    17、core+s2.score);DataStream dataStream=senv.fromElements(Score.of(Li,English,90),Score.of(Wang,English,88),Score.of(Li,Math,85),Score.of(Wang,Math,92),Score.of(Liu,Math,91),Score.of(Liu,English,87);/实现ReduceFunction DataStream sumReduceFunctionStream=dataStream.keyBy(name).reduce(new MyReduceFunction(

    18、);/使用 Lambda 表达式 DataStream sumLambdaStream=dataStream.keyBy(name).reduce(s1,s2)-Score.of(s1.name,Sum,s1.score+s2.score);l 将多个同类型的DataStream合并为一个DataStreaml 数据按照先进先出(FIFO)合并多数据流转换-unionDataStream shenzhenStockStream=.DataStream hongkongStockStream=.DataStream shanghaiStockStream=.DataStream unionSto

    19、ckStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);l 只能连接两个DataStream数据流l 两个数据流类型可以不一致l 两个DataStream经过connect()之后转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态l 应用场景为:使用一个控制流对另一个数据流进行控制多数据流转换-connectl 重写CoMapFunction或CoFlatMapFunctionl 三个泛型,分别对应第一个输入流的数据类型

    20、、第二个输入流的数据类型和输出流的数据类型l 对于CoFlatMapFunction,flatMap1()方法处理第一个流的数据,flatMap2()方法处理第二个流的数据l 可以做到类似SQL Join的效果多数据流转换-connect/IN1为第一个输入流的数据类型/IN2为第二个输入流的数据类型/OUT为输出类型 public interface CoFlatMapFunction extends Function,Serializable /处理第一个流的数据 void flatMap1(IN1 value,Collector out)throws Exception;/处理第二个流的

    21、数据 void flatMap2(IN2 value,Collector out)throws Exception;/CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出 public static class MyCoMapFunction implements CoMapFunction Override public String map1(Integer input1)return input1.toString();Override public String map2(String input2)return input2;CoFlatMapF

    22、unction源代码一个CoFlatMapFunction实现l 并行度逻辑视图中的算子被切分为多个算子子任务每个算子子任务处理一部分数据可以在整个作业的执行环境层面设置也可以对某个算子单独设置并行度StreamExecutionEnvironment senv=StreamExecutionEnvironment.getExecutionEnvironment();/获取当前执行环境的默认并行度 int defaultParalleism=senv.getParallelism();/设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4 senv.setParallelism(4);

    23、在执行环境中设置并行度:对某个算子单独设置:dataStream.map(new MyMapper().setParallelism(defaultParallelism*2);l 默认情况下,数据自动分布到多个实例(或者称之为分区)上l 手动在多个实例上进行数据分配避免数据倾斜l 输入是DataStream,输出也是DataStream数据重分布dataStream.shuffle();基于正态分布,将数据随机分配到下游各算子实例上:dataStream.broadcast();数据会被复制并广播发送给下游的所有实例上:dataStream.global();将所有数据发送给下游算子的第一个

    24、实例上:l rebalance()使用Round-Ribon思想将数据均匀分配到各实例上l rescale()就近发送给下游每个实例数据重分布rebalance()将数据轮询式地分布到下游子任务上 当上游有2个子任务、下游有4个子任务时使用rescale()l partitionCustom()自定义数据重分布逻辑l PartitionerK中泛型K为根据哪个字段进行分区对一个Score类型数据流重分布,希望按照id均匀分配到下游各实例,那么泛型K就为id的数据类型Long重写partition()方法数据重分布FunctionalInterface public interface Part

    25、itioner extends java.io.Serializable,Function /根据key决定该数据分配到下游第几个分区(实例)int partition(K key,int numPartitions);/*Partitioner 其中泛型T为指定的字段类型 *重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配 */public static class MyPartitioner implements Partitioner private Random rand=new Random();private Pattern pattern=Ppile(.*

    26、d+.*);/*key 泛型T 即根据哪个字段进行数据重分配,本例中是Tuple2(Int,String)中的String *numPartitons 为当前有多少个并行实例 *函数返回值是一个Int 为该元素将被发送给下游第几个实例 */Override public int partition(String key,int numPartitions)int randomNum=rand.nextInt(numPartitions/2);Matcher m=pattern.matcher(key);if(m.matches()return randomNum;else return ran

    27、domNum+numPartitions/2;/对(Int,String)中的第二个字段使用 MyPartitioner 中的重分布逻辑 DataStreamTuple2 partitioned=dataStream.partitionCustom(new MyPartitioner(),1);Partitioner源码 一个Partitioner的实现l 数据传输、持久化l 序列化:将内存对象转换成二进制串、网络可传输或可持久化l 反序列化:将二进制串转换为内存对象,可直接在编程语言中读写和操作l 常见序列化方式:JSONJava、Kryo、Avro、Thrift、Protobufl Fli

    28、nk开发了自己的序列化框架更早地完成类型检查节省数据存储空间序列化和反序列化l基础类型Java、Scala基础数据类型l数组l复合类型Scala case classJava POJOTuplel辅助类型Option、List、Mapl泛型和其他类型GenericFlink支持的数据类型l TypeInformaton用来表示数据类型,创建序列化器l 每种数据类型都对应一个TypeInfomationTupleTypeInfo、PojoTypeInfo TypeInformationl Flink会自动推断类型,调用对应的序列化器,对数据进行序列化和反序列化类型推断和序列化package mo

    29、n.typeinfo;public class Types /java.lang.Void public static final TypeInformation VOID=BasicTypeInfo.VOID_TYPE_INFO;/java.lang.String public static final TypeInformation STRING=BasicTypeInfo.STRING_TYPE_INFO;/java.lang.Boolean public static final TypeInformation BOOLEAN=BasicTypeInfo.BOOLEAN_TYPE_IN

    30、FO;/java.lang.Integer public static final TypeInformation INT=BasicTypeInfo.INT_TYPE_INFO;/java.lang.Long public static final TypeInformation LONG=BasicTypeInfo.LONG_TYPE_INFO;.一些基础类型的TypeInformation:l Types.STRING 是用来表示 java.lang.String 的TypeInformationl Types.STRING 被定义为 BasicTypeInfo.STRING_TYPE_

    31、INFOl STRING_TYPE_INFO:使用何种序列化器和比较器类型推断和序列化public static final BasicTypeInfo STRING_TYPE_INFO=new BasicTypeInfo(String.class,new Class,StringSerializer.INSTANCE,StringComparator.class);STRING_TYPE_INFO定义使用何种序列化器和比较器:l 在声明式文件中定义Schemal 使用工具将Schema转换为Java可用的类l Avro Specific生成的类与POJO类似有getter、setter方法在

    32、Flink中可以像使用POJO一样使用Avro Specific模式l Avro Generic不生成具体的类用GenericRecord封装所有用户定义的数据结构必须给Flink提供Schema信息Avro namespace:org.apache.flink.tutorials.avro,type:record,name:MyPojo,fields:name:id,type:int,name:name,type:string Avro声明式文件:l Kryo是大数据领域经常使用的序列化框架l Flink无法推断出数据类型时,将该数据类型定义为GenericTypeInfo,使用Kryo作为

    33、后备选项进行序列化l 最好实现自己的序列化器,并对数据类型和序列化器进行注册l Kryo在有些场景效率不高l env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具体哪个类型无法被Flink自动推断,然后针对该类型创建更高效的序列化器Kryo注册数据类型和序列化器:/将MyCustomType类进行注册 env.getConfig().registerKryoType(MyCustomType.class);/或者使用下面的方式并且实现自定义序列化器 env.getConfig().registerTypeWithKryoSerializer(MyCu

    34、stomType.class,MyCustomSerializer.class);static class MyClassSerializer extends Serializer implements Serializable private static final long serialVersionUID=.Override public void write(Kryo kryo,Output output,MyCustomType myCustomType).Override public MyCustomType read(Kryo kryo,Input input,Class t

    35、ype).l 与Avro Specific模式相似,使用声明式语言定义Schema,使用工具将声明式语言转化为Java类l 有人已经实现好Kryo的序列化器l 案例:MyCustomType是使用Thrift工具生成的Java类,TBaseSerializer是com.twitter:chill-thrift包中别人实现好的序列化器,该序列化器基于Kryo的Serializer。l 注意在pom.xml中添加相应的依赖Thrift、Protobuf/Google Protobuf/MyCustomType类是使用Protobuf生成的Java类/ProtobufSerializer是别人实现好

    36、的序列化器 env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,ProtobufSerializer.class);/Apache Thrift/MyCustomType是使用Thrift生成的Java类/TBaseSerializer是别人实现好的序列化器 env.getConfig().addDefaultKryoSerializer(MyCustomType.class,TBaseSerializer.class);l Flink的数据类型:Java、Scala、Table API分别有自己的数据类型体系l

    37、绝大多数情况下,程序员不需要关心使用何种TypeInformation,只需要使用自己所需的数据类型l Flink会做类型推断、选择对应的序列化器l 当自动类型推断失效,用户需要关注TypeInformationl 数据类型选择:需要考虑:上下游的数据结构、序列化器的性能、状态数据的持续迭代能力POJO和Tuple等内置类型性能更好Avro、Thrift和Protobuf对上下游数据的兼容性更好,不需要在Flink应用中重新设计一套POJOPOJO和Avro对Flink状态数据的持续迭代更友好数据类型小结l 用户自定义函数的三种方式:继承并实现函数类 使用Lambda表达式 继承并实现Rich

    38、函数类用户自定义函数l 对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等interface接口。l 以FlatMapFunction函数式接口为例:继承了Flink的Function函数式接口函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化,一定要保证函数内的所有内容都可以被序列化两个泛型T和O,T是输入,O是输出,要设置好输入和输出数据类型,否则会报错重写虚方法flatMap()Collector收集输出数据函数类package mon.functions;Func

    39、tionalInterface public interface FlatMapFunction extends Function,Serializable void flatMap(T value,Collector out)throws Exception;/使用FlatMapFunction实现过滤逻辑,只对字符串长度大于 limit 的内容进行词频统计 public static class WordSplitFlatMap implements FlatMapFunction private Integer limit;public WordSplitFlatMap(Integer

    40、limit)this.limit=limit;Override public void flatMap(String input,Collector collector)throws Exception if(input.length()limit)for(String word:input.split()collector.collect(word);DataStream dataStream=senv.fromElements(Hello World,Hello this is Flink);DataStream functionStream=dataStream.flatMap(new

    41、WordSplitFlatMap(10);FlatMapFunction源码一个FlatMapFunction实现l 简洁紧凑l Scala对Lambda表达式支持更好l Java 8之后也开始支持Lambda表达式,有类型擦除问题使用returns 提供类型信息Lambda表达式DataStream words=dataStream.flatMap(String input,Collector collector)-for(String word:input.split()collector.collect(word);)/提供类型信息以解决类型擦除问题.returns(Types.STRI

    42、NG);val lambda=dataStream.flatMap(value:String,out:CollectorString)=if(value.size 10)value.split().foreach(out.collect)Scala:Java:l RichMapFunction、RichFlatMapFunction、RichReduceFunctionl 增加了更多功能:open()方法:初始化close()方法:算子最后执行这个方法,可以释放一些资源getRuntimeContext()方法:获取算子子任务的运行时上下文l 累加器例子:分布式计算环境下,计算是分布在多台节点

    43、上的,每个节点处理一部分数据,使用for循环无法满足累加器功能Rich函数类/实现RichFlatMapFunction类/添加了累加器 Accumulator public static class WordSplitRichFlatMap extends RichFlatMapFunction private int limit;/创建一个累加器 private IntCounter numOfLines=new IntCounter(0);public WordSplitRichFlatMap(Integer limit)this.limit=limit;Override public

    44、void open(Configuration parameters)throws Exception super.open(parameters);/在RuntimeContext中注册累加器 getRuntimeContext().addAccumulator(num-of-lines,this.numOfLines);Override public void flatMap(String input,Collector collector)throws Exception /运行过程中调用累加器 this.numOfLines.add(1);if(input.length()limit)for(String word:input.split()collector.collect(word);

    展开阅读全文
    提示  163文库所有资源均是用户自行上传分享,仅供网友学习交流,未经上传用户书面授权,请勿作他用。
    关于本文
    本文标题:Flink原理与实践-DataStream-API的介绍和使用课件.pptx
    链接地址:https://www.163wenku.com/p-5218050.html

    Copyright@ 2017-2037 Www.163WenKu.Com  网站版权所有  |  资源地图   
    IPC备案号:蜀ICP备2021032737号  | 川公网安备 51099002000191号


    侵权投诉QQ:3464097650  资料上传QQ:3464097650
       


    【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。

    163文库