书签 分享 收藏 举报 版权申诉 / 48
上传文档赚钱

类型spark分享分析-共48页课件.ppt

  • 上传人(卖家):三亚风情
  • 文档编号:3592688
  • 上传时间:2022-09-22
  • 格式:PPT
  • 页数:48
  • 大小:3.92MB
  • 【下载声明】
    1. 本站全部试题类文档,若标题没写含答案,则无答案;标题注明含答案的文档,主观题也可能无答案。请谨慎下单,一旦售出,不予退换。
    2. 本站全部PPT文档均不含视频和音频,PPT中出现的音频或视频标识(或文字)仅表示流程,实际无音频或视频文件。请谨慎下单,一旦售出,不予退换。
    3. 本页资料《spark分享分析-共48页课件.ppt》由用户(三亚风情)主动上传,其收益全归该用户。163文库仅提供信息存储空间,仅对该用户上传内容的表现方式做保护处理,对上传内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知163文库(点击联系客服),我们立即给予删除!
    4. 请根据预览情况,自愿下载本文。本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
    5. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007及以上版本和PDF阅读器,压缩文件请下载最新的WinRAR软件解压。
    配套讲稿:

    如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。

    特殊限制:

    部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。

    关 键  词:
    spark 分享 分析 48 课件
    资源描述:

    1、目录Spark简介Spark批处理Spark集群模式SparkSQLSpark StreamingSpark简介Spark是什么Spark特点Spark生态系统Spark与Hadoop的区别PPT模板下载:1ppt/moban/Spark是什么官网介绍:官网介绍:Apache Spark is a fast and general engine for large-scale data processing.Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架,分布式资源管理工作交由集群管理工具(Mesos、YARN)PPT模板下载:1ppt/moban/Spark特点1.先进

    2、架构:Spark采用Scala语言编写,底层采用actor mode的akka作为通讯架构,代码十分简洁高效。基于DAG图的执行引擎,减少多次计算之间中间结果写到HDFS的开销。建立在统一抽象的RDD(分布式内存抽象)之上,使得它可以以基本一致的方式应对不同的大数据处理场景。2.运行速度快:提供Cache机制来支持需要反复迭代的计算,减少数据读取的IO开销3.易用性好:Spark提供广泛的数据集操作类型(各种转换算子,行动算子等)Spark支持Java,Python和Scala API,支持交互式Python和Scala的shell4.通用性强:以其RDD模型的强大表现能力,逐渐形成了一套自己

    3、的生态圈,提供了full-stack的解决方案。主要包括Spark内存中批处理,Spark SQL交互式查询,Spark Streaming流式计算,Mllib机器学习算法,GraphX图计算。5.与Hadoop无缝衔接:Spark可以使用YARN作为它的集群管理器读取HDFS,HBASE的Hadoop的数据PPT模板下载:1ppt/moban/Spark生态圈也称为BDAS(伯克利数据分析栈),是伯克利APMLab实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间通过大规模集成来展现大数据应用的一个平台。Spark生态圈以Spark Core为

    4、核心,从HDFS、Cassandra、Amazon S3和HBase等持久层读取数据,以MESS、YARN和自身携带的Standalone为资源管理器调度Job完成Spark应用程序的计算。这些应用程序可以来自于不同的组件,如Spark Shell/Spark Submit的批处理、Spark Streaming的实时流处理应用、Spark SQL的即席查询、MLlib的机器学习、GraphX的图处理和SparkR的数学计算等等。Spark生态系统PPT模板下载:1ppt/moban/Spark与Hadoop的区别HadoopSpark抽象层次低,需要手工编写代码来完成,使用上难以上手基于RD

    5、D的抽象,使数据处理逻辑的代码非常简短只提供两个操作,Map和Reduce,表达力欠缺提供很多转换和动作,很多基本操作如Join,GroupBy已经在RDD转换和动作中实现中间结果也放在HDFS文件系统中中间结果放在内存中,内存放不下了会写入本地磁盘ReduceTask需要等待所有MapTask都完成后才可以开始分区相同的转换构成流水线放在一个Task中运行,分区不同的转换需要Shuffle,被划分到不同的Stage中,需要等待前面的Stage完成后才可以开始时延高,只适用Batch数据处理,对于交互式数据处理,实时数据处理的支持不够通过将流拆成小的batch提供Discretized Str

    6、eam处理流数据对于迭代式数据处理性能比较差通过在内存中缓存数据,提高迭代式计算的性能历经10年发展,已在生产环境稳定运行多年运行不够稳定PPT模板下载:1ppt/moban/Spark与Hadoop的区别Hadoop数据抽取运算模型:反复读写,磁盘IO是瓶颈Spark数据抽取运算模型:Spark批处理RDD简介Spark程序入口创建RDDRDD操作TransformationsActionsPPT模板下载:1ppt/moban/RDD(Resilient Distributed Dataset):弹性分布式数据集-分布式内存抽象的概念RDD是Spark对数据的核心抽象,是Spark的基石。R

    7、DD是一个可容错、只读的、已被分区的、可并行操作的分布式元素集合RDD的特点:1.只读:状态不可变,不能修改2.分区:支持元素根据 Key 来分区(Partitioning),保存到多个结点上,还原时只会重新计算丢失分区的数据,而不会影响整个系统3.RDD必须是可序列化的4.路径:在 RDD 中叫血统(lineage),即 RDD 有充足的信息关于它是如何从其他 RDD 产生而来的5.持久化:可以控制存储级别(内存、磁盘等)来进行持久化在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示,即数据分区的集合,能根据本地性快速访问到数据的偏好位置,依赖关系,计算方法,是否是哈希/范围分区的

    8、元数据:RDD简介操作操作含义含义partitions()返回一组Partition对象preferredLocations(p)根据数据存放的位置,返回分区p在哪些节点访问更快dependencies()返回一组依赖iterator(p,parentIters)按照父分区的迭代器,逐个计算分区p的元素(计算函数)partitioner()返回RDD是否hash/range分区的元数据信息PPT模板下载:1ppt/moban/SparkContext:Spark应用程序需要做的第一件事就是创建一个 SparkContext 对象,SparkContext对象决定了Spark如何访问集群。而要新

    9、建一个SparkContext对象,你还得需要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。每个JVM进程中,只能有一个活跃(active)的SparkContext对象。如果你非要再新建一个,那首先必须将之前那个活跃的SparkContext 对象stop()掉。Spark中已经有一个创建好的SparkContext,简称scScala:import org.apache.spark.SparkContext import org.apache.spark.SparkConfval conf=new SparkConf().setAppName(ap

    10、pName).setMaster(master)val sc=new SparkContext(conf)Python:from pyspark import SparkConf,SparkContextconf=SparkConf().setAppName(appName).setMaster(master)sc=SparkContext(conf=conf)Spark程序入口PPT模板下载:1ppt/moban/一、并行化集合一、并行化集合并行化集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize()方法创建得到的RDD。集合

    11、对象中所有的元素都将被复制到一个可并行操作的分布式数据集中。Scala:Python:val data=Array(1,2,3,4,5)data=1,2,3,4,5val distData=sc.parallelize(data)distData=sc.parallelize(data)二、外部数据集二、外部数据集Spark 可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的输入

    12、格式(InputFormat)。Scala:val distFile=sc.textFile(data.txt)Python:distFile=sc.textFile(data.txt)三、通过转换现有的三、通过转换现有的RDD得到得到四、改变现有四、改变现有RDD的持久性的持久性(cache、save)创建RDDPPT模板下载:1ppt/moban/日志挖掘:val lines=sc.textFile(“hdfs:/.”)val errors=lines.filter(_.startsWith(“ERROR”)errors.cache()errors.filter(_.contains(“H

    13、DFS”).map(_.split(t)(3).take(10)作用于RDD上的操作分为转换(transformation)和动作(action)。Spark中的所有transformation都是惰性的,在执行transformation,并不会提交Job,只有在执行action操作,才会被提交到集群中真正的被执行。Transformation:将已有RDD转换得到一个新的RDD。Action:计算,返回结果或把RDD数据写到存储系统中。RDD操作PPT模板下载:1ppt/moban/窄依赖(narrow dependencies):父RDD的每个分区都只被子RDD的一个分区所依赖比如map

    14、、filter、union等宽依赖(wide dependencies):父RDD的分区被多个子RDD的分区所依赖。比如groupByKey、reduceByKey等1.程序优化:窄依赖支持在一个节点上管道化执行。如在filter之后执行map2.容错:窄依赖支持更高效的故障还原。只有丢失的父RDD的分区需要重新计算宽依赖需要所有父RDD的分区,因此就需要完全重新执行Checkpoint:Lineage链较长、宽依赖的RDD需要采用检查点机制。RDD依赖PPT模板下载:1ppt/moban/Transformation作用作用示例示例结果结果map(func)返回一个新的数据集,其中每个元素都

    15、是由源RDD中一个元素经func转换得到的Rdd.map(x=x+1)1,2,3,32,3,4,4filter(func)返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后的结果rdd.filter(x=x!=1)1,2,3,32,3,3flatMap(func)类似于map,但每个输入元素可以映射到0到n个输出元素Rdd.flatMap(x=x.split(“)“hello world”,”hi”“hello”,”world”,”hi”union(otherDataset)返回源数据集和参数数据集的并集Rdd.union(other)1,2,3 3,4,51,2,3,3,4

    16、,5distinct(numTasks)返回对源数据集做元素去重后的新数据集Rdd.distinct()1,2,3,31,2,3groupByKey(numTasks)若源RDD包含(K,V)对,则返回一个新的数据集包含(K,Iterable)对Rdd.groupByKey()Rdd=(1,2),(3,4),(3,6)(1,2),(3,4,6)reduceByKey(func,numTasks)若源RDD为(K,V)对,则为(K,V)对的RDD,每个key对应的value是经过func聚合后的结果Rdd.reduceByKey(x,y)=x+y)Rdd=(1,2),(3,4),(3,6)(1,

    17、2),(3,10)join(otherDataset,numTasks)若源RDD为(K,V)且参数RDD为(K,W),则返回的新RDD中将包含内关联后key对应的(K,(V,W)对Rdd.join(other)Rdd=(1,2),(3,4),(3,6)Other=(3,9)(3,(4,9),(3,(6,9)TransformationsPPT模板下载:1ppt/moban/Action作用作用示例示例1,2,3,3结果结果reduce(func)将RDD中元素按func进行聚合Rdd.reduce(x,y)=x+y)9collect()将数据集中所有元素以数组形式返回驱动器(driver)程

    18、序。Rdd.collect()1,2,3,3count()返回数据集中元素个数Rdd.count()4first()返回数据集中首个元素(类似于 take(1))Rdd.first()1take(n)返回数据集中前 n个元素Rdd.take(2)1,2saveAsTextFile(path)将数据集中元素保存到指定目录下的文本文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。saveAsSequenceFile(path)将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。saveA

    19、sObjectFile(path)将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。ActionsSpark集群模式Spark完整示例集群模式概览术语解释Stage划分Spark任务调度Spark运行模式PPT模板下载:1ppt/moban/Spark完整示例Scala:submit提交:spark-submit-master yarn-class com.xxx AppName.jarimport org.apache.spark.SparkContext import org.apache.spark.SparkCo

    20、nf object SimpleApp def main(args:ArrayString)val logFile=YOUR_SPARK_HOME/README.md val conf=new SparkConf().setAppName(Simple Application)val sc=new SparkContext(conf)val logData=sc.textFile(logFile,2).cache()val numAs=logData.filter(line=line.contains(a).count()val numBs=logData.filter(line=line.c

    21、ontains(b).count()println(Lines with a:%s,Lines with b:%s.format(numAs,numBs)Python:submit提交:spark-submit-master yarn-executor-memory 10g AppName.pyfrom pyspark import SparkContextlogFile=YOUR_SPARK_HOME/README.md sc=SparkContext(local,Simple App)logData=sc.textFile(logFile).cache()numAs=logData.fil

    22、ter(lambda s:a in s).count()numBs=logData.filter(lambda s:b in s).count()print(Lines with a:%i,lines with b:%i%(numAs,numBs)PPT模板下载:1ppt/moban/在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。Spark会为该应用在各个集群节点上申请executor,用于执行计算任务和存储数据。接下来,Spark将应用程序代

    23、码(JAR包或者Python文件)发送给所申请到的executor。最后SparkContext将分割出的task发送给各个executor去运行。集群模式概览PPT模板下载:1ppt/moban/注意:1.每个Spark应用程序都有其对应的多个executor进程,executor进程在整个应用程序生命周期内,都保持运行状态,并以多线程方式运行所收到的任务。好处:可以隔离各个Spark应用,从调度角度来看,每个driver可以独立调度本应用程序内部的任务,从执行器角度来看,不同的Spark应用对应的任务将会在不同的JVM中运行。坏处:多个Spark应用程序之间无法共享数据,除非把数据写到外部

    24、存储中。2.Spark对底层的ClusterManager一无所知。只要Spark能申请到executor进程,并且能与之通信即可。3.driver在整个生命周期内必须监听并接受其对应的各个Executor的连接请求。因此,driver必须能够被所有worker节点访问到。4.因为集群上的任务是由driver来调度的,所以driver应该和worker节点距离近一些,最好在同一个本地局域网中。如果你需要远程对集群发起请求,最好还是在driver节点上启动RPC服务,来响应这些远程请求,同时把driver本身放在集群worker节点比较近的机器上。集群模式概览PPT模板下载:1ppt/moban

    25、/术语术语描述描述Application用户编写的Spark应用程序,包含一个Driver program和分布在集群中多个节点上运行的若干ExecutorDriver program运行Application的main()函数并且创建SparkContext Cluster manager在集群上获取资源的外部服务(例如:Standalone、Mesos、Yarn)Worker node集群中任何可以运行Application代码的节点ExecutorApplication运行在Worker 节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Applicatio

    26、n都有各自独立的一批ExecutorTask被送到某个Executor上的工作单元Job包含多个Task组成的并行计算,往往由Spark Action催生Stage每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段DAGScheduler实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Taskset放到TaskScheduler中TaskScheduler与DAGScheduler交互,实现Task分配到Executor上执行术语解释PPT模板下载:1ppt/mo

    27、ban/Stage划分stage的边界有两种情况:1.宽依赖上的Shuffle操作;2.已缓存分区,它可以缩短父RDD的计算过程。一个stage的开始就是从外部存储或shuffle结果中读取数据,一个stage的结束就是发生shuffle或生成结果时PPT模板下载:1ppt/moban/1.创建RDD,经过一系列Transformation,最后Action2.Action会触发SparkContext的rujob方法,交给DAGScheduler处理3.DAGScheduler将DAG划分成Stage4.将Stage交给TaskScheduler5.集群的Executor上运行Spark任务

    28、调度PPT模板下载:1ppt/moban/一个按 A-Z 首字母分类,查找相同首字母下不同姓名总个数的例子:步骤 1:创建 RDD 上面的例子除去最后一个 collect 是个动作,不会创建 RDD 之外,前面四个转换都会创建出新的 RDD。因此第一步就是创建好所有 RDD(内部的五项信息)。步骤 2:创建执行计划 Spark 会尽可能地管道化,并基于是否要重新组织数据来划分 阶段(stage),例如本例中的 groupByKey()转换就会将整个执行计划划分成两阶段执行。最终会产生一个 作为逻辑执行计划。步骤 3:调度任务将各阶段划分成不同的 任务(task),每个任务都是数据和计算的合体。

    29、在进行下一阶段前,当前阶段的所有任务都要执行完成。sc.textFile(“hdfs:/names”).map(name=(name.charAt(0),name).groupByKey().mapValues(name=names.toSet.size).collect()RDD执行过程PPT模板下载:1ppt/moban/运行环境运行环境模式模式描述描述Local单机模式常用于本地开发测试,分为local单线程和local-cluster多线程Standalone集群模式Spark自带,最简单的集群模式Hadoop Yarn集群模式运行在Yarn资源管理器之上,由Yarn负责资源管理,Sp

    30、ark负责任务调度和计算Apache Mesos集群模式运行在Mesos资源管理器之上,由Yarn负责资源管理,Spark负责任务调度和计算Amazon Ec2集群模式运行在云端的集群Spark运行模式Spark SQLSparkSQL简介SparkSQL入口DataFrame简介DataFrame创建DataFrame操作和RDD互操作PPT模板下载:1ppt/moban/SQL-on-Hadoop:Hive是SQL-on-Hadoop最常用的工具,但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL

    31、-on-Hadoop工具开始产生,其中表现较为突出的是:MapR的DrillCloudera的ImpalaHortonworks的Hive on TezFacebook的PrestoSparkSQL(Shark-Hive on Spark)但Shark对于Hive的太多依赖,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL简介PPT模板下载:1ppt/moban/入口:入口:SQLContext:SparkSQL的所有功能入口都是SQLContext类及其子类。要创建一个SQLContext对象,首先需要有一个SparkContext对象。Scala:Val

    32、 sc:SparkContext/假设已经有一个SparkContext对象Val sqlContext=new org.apache.spark.sql.SQLContext(sc)Python:sqlContext=SQLContext(sc)HiveContext:HiveContext继承自SQLContext,除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。Scala:Val HiveContext=new org.apache.spark.sql.hive.HiveContext(sc)Python

    33、:HiveContext=HiveContext(sc)SparkSQL入口PPT模板下载:1ppt/moban/DataFrame:是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。相对于RDD,DataFrame有几个特点:1.包含schema信息,能够进行针对性的优化。2.对用户有更加友好、更直观的API。3.与外部数据源 API紧密

    34、集成,可以用作多种存储格式和存储系统间的数据交换媒介。DataFrame简介PPT模板下载:1ppt/moban/创建DataFrame1.从json文件创建DataFrame:val df=hiveContext.read.json(examples/src/main/resources/people.json)2.从parquet文件创建DataFrame:val df=hiveContext.read.parquet(examples/src/main/resources/people.parquet)3.从orc文件创建DataFrame:val df=hiveContext.read

    35、.orc(examples/src/main/resources/people.orc)4.从hive表创建DataFrame:val df=hiveContext.table(gdpi)5.从txt文件创建DataFrame:val df=hiveContext.read.text(/path/to/spark/README.md)PPT模板下载:1ppt/moban/DataFrame操作1.展示 DataFrame 的内容 df.show()/age name/null Michael/30 Andy/19 Justin2.展示所有人,但所有人的 age 都加1 df.select(df

    36、(name),df(age)+1).show()/name (age+1)/Michael null/Andy 31/Justin 20 3.计算各个年龄的人数 df.groupBy(age).count().show()/age count/null 1/19 1/30 1PPT模板下载:1ppt/moban/DataFrame操作SQL操作:1.首先把DataFrame注册为临时表df.registerTempTable(people)2.HiveContext.sql执行SQL查询,并返回DataFrame,语法与Hql一致val teenagers=hiveContext.sql(SE

    37、LECT name,age FROM people WHERE age=13 AND age Person(p(0),p(1).trim.toInt).toDF()people.registerTempTable(people)/sqlContext.sql方法可以直接执行SQL语句 val teenagers=sqlContext.sql(SELECT name,age FROM people WHERE age=13 AND age StructField(fieldName,StringType,true)/将RDDpeople的各个记录转换为Rows,即:得到一个包含Row对象的RDD

    38、val rowRDD=people.map(_.split(,).map(p=Row(p(0),p(1).trim)/将schema应用到包含Row对象的RDD上,得到一个DataFrame val peopleDataFrame=sqlContext.createDataFrame(rowRDD,schema)/将DataFrame注册为table peopleDataFrame.registerTempTable(people)/执行SQL语句 val results=sqlContext.sql(SELECT name FROM people)PPT模板下载:1ppt/moban/和RD

    39、D互操作编程方式定义编程方式定义Schema:Python:From pyspark.sql import SQLContextFrom pyspar.sql.types import*sqlContext=SQLContext(sc)/加载文件Lines=sc.textFile(“examples/src/main/resources/people.txt”)Parts=lines.map(lambda l:l.split(“,”)/转换每一行为元组People=parts.map(lambda p:(p0,p1.strip()/定义schemaschemaString=“name age”

    40、Fields=StructField(field_name,StringType(),True)for field_name in schemaString.split()Schema=StructType(fields)/将schema应用到RDD上schemaPeople=sqlContext.createDataFrame(people,schema)/注册为表schemaPeople.registerTempTable(“people”)Results=sqlContext.sql(“SELECT name FROM people”)Spark StreamingSpark Strea

    41、ming简介Spark Streaming与Strom的对比Spark Streaming工作原理Spark Streaming编程离散数据流转换和输出算子PPT模板下载:1ppt/moban/Spark Streaming简介Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,

    42、数据库数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如机器学习、图计算等,对流数据进行处理。PPT模板下载:1ppt/moban/Spark Streaming与Strom的区别StromSpark Streaming传入一个处理一个,吞吐量相对小批处理,吞吐量大实现真正流式实时的处理数据,延迟在秒级以下,实时性很高本质还是批量处理,在短的时间窗口内进行数据实时处理,延迟在秒级左右,实时性相对较弱每个单独的记录当它通过系统时必须被跟踪,所以Storm能够至少保证每个记录将被处理一次,但是在从错误中恢复过来时候允许出现重复记录

    43、能够保证每个批处理的所有数据只处理一次,保证数据不会在恢复的时候错乱(批处理重新执行)Clojure语言开发Scala语言开发提供java API提供java、python API2019年开始就在Twitter内部生产环境中使用,已经非常成熟2019年才陆续有一些公司开始试用,但最近越来越多公司已经在生产中使用PPT模板下载:1ppt/moban/Spark Streaming工作原理Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,最终得到处理后的一批批结果数据。处理流程:1.Spark Streaming把实时输入数据流以时间

    44、片t(如1秒)为单位切分成块2.Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据3.每个块都会生成一个Spark Job处理4.最终结果也返回多块PPT模板下载:1ppt/moban/Spark Streaming编程入口:StreamingContext 是Spark Streaming的入口。val conf=new SparkConf().setMaster(local2).setAppName(NetworkWordCount)val ssc=new StreamingContext(conf,Seconds(1)/已有spark contex

    45、t的情况val ssc=new StreamingContext(sc,Seconds(1)Python:Sc=SparkContext(“local2”,”NetworkWordCount”)Ssc=StreamingContext(sc,1)步骤:1.创建DStream对象,并定义好输入数据源。2.基于数据源DStream定义好计算逻辑和输出。3.调用streamingContext.start()启动接收并处理数据。4.调用streamingContext.awaitTermination()等待流式处理结束。5.你可以主动调用 streamingContextssc.stop()来手动

    46、停止处理流程。注意点:1.一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。2.一旦streamingContext被stop掉,就不能restart。3.单个JVM虚机同一时间只能包含一个active的StreamingContext。PPT模板下载:1ppt/moban/Spark Streaming编程Spark-shell中:import org.apache.spark._import org.apache.spark.streaming._val ssc=new StreamingContext(sc,Seconds(1)/创建StreamingCont

    47、ext,批次间隔为1秒val lines=ssc.socketTextStream(“192.168.5.2,9998)/创建一个连接到hostname:port的Dstreamval words=lines.flatMap(_.split(“”)/将每一行分割成多个单词val pairs=words.map(word=(word,1)/对每一批次中的单词进行计数val wordCounts=pairs.reduceByKey(_+_)wordCounts.print()/将该DStream产生的RDD的头十个元素打印到控制台上ssc.start()/启动流式计算ssc.awaitTermin

    48、ation()/等待直到计算终止Python:From pyspark import SparkComtextFrom pyspark.streaming import StreamingContextWords=lines.flatMap(lambda line:line.split(“”)/将每一行分割成多个单词Pairs=words.map(lambda word:(word,1)/对每一批次中的单词进行计数wordCounts=pairs.reduceByKey(lambda x,y:x+y)wordCount.pprint()/将该DStream产生的RDD的头十个元素打印到控制台上

    49、PPT模板下载:1ppt/moban/离散数据流离散数据流(DStreams)Dstream:离散数据流是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都是不可变、分布式的数据集。每个RDD都包含了特定时间间隔内的一批数据。任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。例如,在前面的例子中,我们将 lines 这个DStream转成words DStream对象,其实作用于lines上的flatMap算子,会施加于lines中的每个R

    50、DD上,并生成新的对应的RDD,而这些新生成的RDD对象就组成了words这个DStream对象。PPT模板下载:1ppt/moban/Transformation算子Transformation用途用途transform(func)返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作updateStateByKey(func)返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。Transform算子

    展开阅读全文
    提示  163文库所有资源均是用户自行上传分享,仅供网友学习交流,未经上传用户书面授权,请勿作他用。
    关于本文
    本文标题:spark分享分析-共48页课件.ppt
    链接地址:https://www.163wenku.com/p-3592688.html

    Copyright@ 2017-2037 Www.163WenKu.Com  网站版权所有  |  资源地图   
    IPC备案号:蜀ICP备2021032737号  | 川公网安备 51099002000191号


    侵权投诉QQ:3464097650  资料上传QQ:3464097650
       


    【声明】本站为“文档C2C交易模式”,即用户上传的文档直接卖给(下载)用户,本站只是网络空间服务平台,本站所有原创文档下载所得归上传人所有,如您发现上传作品侵犯了您的版权,请立刻联系我们并提供证据,我们将在3个工作日内予以改正。

    163文库