美文网首页
spark入门

spark入门

作者: 全然大好人 | 来源:发表于2017-10-27 18:44 被阅读0次

    spark(1)介绍

    1. 快速且通用的集群计算平台
    • 扩充了流行的MapReduce计算模型
    • 基于内存(发现hadoop在迭代式计算和交互式上的低效)
    • 融合优点:批处理(hadoop)、迭代式运算(机器学习系统)、交互式查询(Hive)、流处理(Storm),降低了成本
    • 和其他大数据工具整合很好,如hadoop、kafka
    2. spark组件
    组件 Spark Core Spark SQL Spart Streaming Spart MLlib GraphX Cluster Managers 紧密集成
    3. Hadoop Spark 差异
    • Hadoop离线处理,对时效性要求不高
    • Spark用于时效性要求高及机器学习等领域
    • Spark不具备HDFS的存储能力,要借助HDFS等持久化数据

    spark(2)下载安装、shell

    1. 虚拟机联网
    2. 下载
    3. 解压
    • tar -zxvf spark-1.6.3-bin-hadoop2.6.tgz
    4. 目录
    • bin/spark shell(交互性、实时性)pyspark和spark-shell
    • core、streaming、python主要是组件源代码
    • example包含单机Spark job
    shell
    5. scala shell
    • 读取文件
    val line = sc.textFile("../../testfile/helloSpark")     //加载文本文件,返回RDD
    line.count()    //计算行数
    line.first()     //第一行
    
    6. 修改日志级别
    • 日志输出太多,改为显示WARN日志
      conf/log4j
    cp log4j.properties.template  log4j.properties
    vi  log4j.properties
    log4j.rootCategory=WARN,console(wq)
    

    Spark(3)开发环境

    1. Scala安装
    • Spark 2.0 —— Scala 2.11
    2. IDEA安装
    3. IDEA插件
    • Scala
    • SBT
    4. 新建项目
    • Scala项目,用SBT打包
      Scala 2.10.5 Spark 1.6.2 SBT 0.13.8 JDK 1.8
    5. ssh无密码登录
    • ssh localhost发现要输密码
    ssh-keygen
    touch authorized_keys(.ssh下)
    cat id_rsa.pub > authorized_keys
    chmod 600 authorized_keys
    
    • ssh localhost试验是否登录成功
    6. 第一个程序WordCount
    1. 创建一个Spark Context
    2. 加载数据
    3. 把每一行分割成单词
    4. 转换成pairs并且计数
    配置
    • 新建Scala类(object类型)
    程序
    • build,打成jar包
    7. 启动集群
    1. (spark文件夹下)启动master ./sbin/start-master.sh
    2. 启动worker ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost.localdomain:7077
      worker上打开localhost:8080,得到 ://localhost.localdomain:7077
    3. 提交作业 ./bin/spark-submit --master spark://localhost.localdomain:7077 --class WordCount /home/maixia/soft/imoocpro.jar

    Spark(4)RDDS

    1.Driver program
    • 包含程序的main()方法,RDDs的定义和操作
    • 管理很多结点(executors)
    2. SparkContext
    • Driver programs 通过SparkContext对象访问Spark
    • SparkContext对象代表和一个集群的连接
    • 在Shell中SparkContext自动创建好了就是sc
    3. RDDs基础
    • Resilient distributed datasets(弹性分布式数据集)
    • 并行分布在整个集群中,是Spark分发数据和计算的基础抽象类
    • 代表一个不可改变的分布式集合对象,Spart中所有计算都通过RDDs的创建、转换、操作完成
    • 一个RDD内部有很多partitions(分片)组成,每个分片包括一部分数据,partitions可在集群不同节点上计算。
    • 分片是Spark并行处理的单元,Spark顺序地、并行地处理分片
    4. RDDs创建
    1. val rdd = sc.parallelize(Array(1,2,2,4),4) 测试用
      rdd.foreach(println)
    2. val rddText = sc.textFile("helloSpark.txt") 加载外部数据集
    5. Scala匿名函数 类型推断

    lines.filter(line => line.contains("world"))

    • 定义一个匿名函数,接受一个参数line
    • 使用line这个String类型变量上的contains方法,并且返回结果
    • line的类型不需要指定,能够推断出来
    6. Transformation
    • 从一个RDD构建一个新的RDD,如map()和filter()
    1. 逐元素的Transformation
    • map(),接收函数,把函数应用到RDD的每一个元素,返回新RDD
    • filter(),接收函数,返回只包含满足filter()函数的元素的新RDD
    • flatMap(),对每个输入元素,输出多个输出元素,将RDD中元素压扁后返回一个新的RDD
    1. 集合运算
    • 并集:rdd1.union(rdd2)
    • 交集:rdd1.intersection(rdd2)
    • 差集:rdd1.subtract(rdd2)
    • 去重:rdd.distinct()
    7. Action
    • 在RDD上计算出一个结果,把结果返回给driver program或保存在文件系统
    1 2
    • reduce(),接受一个函数,作用在RDD两个类型相同的元素上,返回新元素,可以实现RDD中元素累加、计数等聚集操作
    • collect(),遍历整个RDD,向driver program返回RDD的内容(需要单机内存能够容纳下,因为数据要拷贝给driver,测试使用,大数据时使用saveAsTextFile())
    • take(n),返回RDD的n个元素,同时尝试访问最少的partitions(返回结果无序,测试使用)
    • top(),排序(根据RDD中数据的比较器)
    • foreach(),计算RDD中每个元素,但不返回到本地(一般配合println打印数据,方便测试)
    8. keyValue对RDDs
    • 使用map()函数,返回key/value对(包含整行数据的RDD,把每行数据的第一个单词作为key)
    val rdd2 = rdd1.map(line=>(line.split(" ")(0),line))
    
    1 2
    9. combineByKey()
    • 最常用的基于key的聚合函数,返回类型可以与输入类型不一样
    • 遍历partition中的元素,对于没见过的key使用createCombiner()函数,对于见过的使用mergeValue()函数,合计每个partition的结果时使用mergeCombiners()函数
    //把成绩相加再求平均值
    val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
    val average=scores2.map{case(name,(num,score))=>(name,score/num)}
    
    10. Spark特性
    • 血统关系
    血统关系图
    • 延迟计算(Lazy Evaluation)
    • action时才会真正计算,减少数据的传输
    • 内部记录metadata表明transformations操作已被响应
    • 加载数据也是延迟计算,数据只有在必要时候才会加载进去
    • RDD.persist(),默认每次在RDD上进行action操作时,Spark都重新计算一遍,如需可重用使用RDD.persist(),unpersist()从缓存中移除
    参数 参数
    • SER表示序列号,对CPU占用高

    相关文章

      网友评论

          本文标题:spark入门

          本文链接:https://www.haomeiwen.com/subject/ctfdpxtx.html