美文网首页
Spark简述

Spark简述

作者: 上山走18398 | 来源:发表于2019-07-09 07:21 被阅读0次

    Spark 简介

    Spark是什么
    一个快速且通用的集群计算平台
    
    Spark特点:
     1. Spark扩充了流行的Mapreduce计算模型
     
     2. Spark是基于内存的计算(一些中间数据存储在内存中,加快读写)
     
     
    Spark 是通用的
        Spark的设计容纳了其他分布式系统拥有的功能
        
        批处理,迭代式计算,交互查询和流处理等
        
    Spark是高度开放的
    
        Spark提供了Python Java Scala Sql的API和丰富的内置库
        Spark和其他的大数据工具整合的很好,包括Hadoop、kafaka等
    
    1. Spark生态圈
    最初是基于Hadoop MapReduce的
    发现MapReduce在迭代式计算和交互式上低效,引入了内存存储(重大的区别)
    
    Spark的组件:
    Spark SQL structured data:
    是Spark处理结构化数据的库,就像Hive SQL,Mysql一样--报表统计
    Spark Streaming real-time:
    是实时数据流处理组件,类似Storm
    Spark Streaming提供了API来操作实时流数据
    应用场景,企业中用来从Kafka接收数据做实时统计
    
    
    
    
    Mlib machine learning
    
    一个包含通用机器学习功能的包, Machine learning lib
    包含分类,聚类,回归等,还包括模型评估,和数据导入
    
    MLib提供的上面这些方法,都支持集群上的横向扩展
    
    
    
    
    
    Graphx graph processing
    
    是处理图的库(例如,社交网络),并进行图的并行计算
    继承了RDD API
    它提供了各种图的操作和常用的图算法,例如PangeRank算法
    应用场景,图计算
    
    ---------------------
    Spark Core
    包含了Spark的基本功能,包含任务调度,内存管理,容错机制
    内部定义了RDDs(弹性分布式数据集)
    提供了很多APPIs来创建和操作这些RDDs
    应用场景,为其他组件提供底层服务
    
    ---------------------
    Cluster Managers:
        
        就是集群管理,Spark自带一个集群管理是单独调度器
        常见集群管理包括Hadoop YARN, Apache Mesos
    
    紧密集成的优点
    Spark底层优化了,基于Spark底层的组件,也得到了相应的优化
    
    紧密集成,节省了各个组件组合使用时的部署,测试等时间
    
    向Spark增加新的组件,其它组件时,可立刻享用新组件的功能
    
    1. Spark 和 Hadoop比较
    Hadoop应用场景:
    离线处理
    
    对时效性要求不高:中间数据会落在硬盘上(几分钟到几小时不等)
    
    Spark应用场景:
    时效性要求高的场景(几秒钟到几分钟不等)
    
    机器学习等领域
    基于内存
    
    比较:
    这是生态系统,每个组件都有其作用,各司其职即可
    
    Spark不具有HDFS的存储能力,要借助HDFS等持久化数据。
    大数据将会孕育出更多的新技术
    

    Spark安装
    数据处理????

    Spark运行环境:
    Spark是Scala写的,运行在JVM上,所有运行环境是Java7+
    python 2.6+ 3.7+
    
    
    Spark的Shell:
    Spark的Shell使你能够处理分布在集群上的数据
    
    Spark把数据加载到节点的内存中,因此分布式处理可在秒级完成
    
    快速迭代式计算,实时查询,分析一般能够在shells中完成
    
    Spark提供了Python shells 和 Scala shells
    
    python shell :
     bin/pyspark
     bin/spark-shell
     
     spark是用Scala写的???
     网络问题 版本问题
    

    Rdds

    RDDs介绍

    Driver programe
     包含程序main()方法,RDDs定义和操作
     它管理很多节点,我们称作executors
     
    SparkContext:
     driver programs 通过SparkContext对象访问Spark
     SparkContext对象代表和一个集群的连接
     在shell中SparkContext自动创建好了,就是sc变量(直接调用即可)
     scala> val lines=sc.textFile(../xx)
    -----------------------------
    
    RDDs:
    1. Resilient distributed datasets(弹性分布式数据集,简写RDDs)
    2. 这些RDDs,并行的分布在整个集群中
    eg。1
    scala> val lines=sc.textFile(../xx)
    假设这个文件有500g ,对其切分分布在5个节点上
    通过lines就可以知道文件存放在哪,并操作(lines就是RDDs)
    3. RDDs是Spark分发数据和计算的基础抽象类
    
        1.1 一个RDD是一个不可改变的分布式集合对象(相当于文件被占用,不可修改)
        1.2 Spark中,所有的计算都是通过RDDs的创建,转换,操作完成的。
        1.3 一个RDD内部由许多partitions(分片)组成
        
    分片:
    每个分片包括一部分数据,partitions可在集群不同节点上计算
    
    分片是Spark并行处理的单元,Spark顺序的,并行的处理分片
    
    RDDs的创建方法:
    一、 把一个存在的集合传给SparkContext的parallelize()方法,测试用
    val rdd = sc.parallelize(Array(1,2,2,4),4)
    第一个参数:待并行化处理的集合,第2个参数:分区个数
    
    二、 加载外部数据集
    
    val rddText = sc.textFile(“xxx.txt”)
    
    
    Scala基础语法:
    val  var创建变量
    val:不可修改
    var:可变
    
    Scala的匿名函数和类型推断
    

    RDD基本操作之Transformation

    Transformation介绍:
    
    从之前RDD构建一个新的RDD,像map()和filter()
    
    逐元素Transformation
     map()
     
     map接收函数,把函数应用到RDD的每一个元素,返回新RDD
     
     val lines = lines.map(word=>(word,1))
     
     filter() 
     
     filter()接收函数,返回只包含满足filter()函数的元素的新rdd
     
     
     
     val lines =lines.filter(word=>word.contains(‘hello’))
     
     flatMap()
     
     
     val inputs = sc.TextFile("xxx.txt")
     
     val lines = inputs.flatMap(line=>line.split(" "))
     
     
    集合运算
    并集 交集等
    
    val rdd1=sc.parallelize(Array("coffe","coffe","panda","monkey","tea"))
    
    val rdd2=sc.parallelize(Array("coffe","coffe","panda","monkey","cat"))
    
    val rdd_distinct=rdd1.distinct(rdd2)
    
    val rdd_union =rdd1.union()
    
    val rdd_inter=rdd.intersection(rdd2)
    
    val rdd_sub=rdd1.subtract(rdd2)
    
    RDD基本操作action,对返回结果对操作???
    Action介绍
    在RDD上计算出来一个结果
    把结果返回给driver program或保存在文件系统,count(),save
    
    reduce() //累加,计数,和其他类型的聚集操作
    rdd.reduce((x,y)=>x+y)
    
    Collect() 
    //遍历整个RDD,向driver program返回RDD内容
    //需要单机内存能够容纳下(因为数据要拷贝给driver,测试使用)
    大数据的时候,使用saveAsTextFile() action等
    
    take(n)
    返回RDD的n个元素(同时尝试访问最少的partitions)
    返回结果是无序的测试使用
    
    top()
    //排序
    
    foreach()
    //计算RDD中的每个元素,但不返回本地,不保存
    
    
    RDDs的特性
    RDDs的血统关系图:
     Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图
     
     Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据
     
    延迟计算(lazy evaluation)
    
    Spark对RDDs的计算是,他们第一次使用action操作的时候
    这种方式在处理大数据的时候特别有用,可以减少数据的传输
    Spark内部记录metadata表明transformation操作已被响应
    加载数据也是延迟计算,数据只有在必要的时候,才会被加载进去
    
    RDD.persist()
     默认每次在RDDs上进行action操作时,Spark都重新计算RDDs
     如果想重复利用一个RDD,可以使用RDD.persist()参数:内存,硬盘
     unpersist 从缓存中移除
    

    KeyValue对RDDs

    创建KeyValue对Rdds:
    使用Map函数 ,返回keyValue
    
    eg.1 
    使用map()函数,返回key/vlaule
    例如,包含数行数据对RDD,把每行数据数据的第一个单词作为keys
    
    
    val rdd2=rdd.map(line=>(line.split(" ")(0),line))
    
    KeyValue对RDDs的Transformations(example:{(1,2),(3,4),(5,6)})
    
    reduceBykey()
    groupByKey()
    .............
    
    
    combineByKey():
    (createCombiner,mergeValue,MergeCombiners,partitioner)
    最常用基于key的聚合函数,返回类型可以与输入类型不一样
    许多基于key的聚合函数都用到了它,像groupByKey()
    
    combineByKey():
    遍历partition中的元素,元素的key,要么之前见过,要么不是
    如果是新元素,使用我们提供的createCombiner()函数
    如果是这个partition中已经存在key,就会使用mergeValue()函数
    合计每个partition的结果的时候,使用mergeCombiner()函数
    

    相关文章

      网友评论

          本文标题:Spark简述

          本文链接:https://www.haomeiwen.com/subject/lyctkctx.html