美文网首页
spark chapter 3 RDD

spark chapter 3 RDD

作者: 深海suke | 来源:发表于2019-08-10 16:41 被阅读0次

    # 1 什么是RDD

    *Resilient Distributed Dataset弹性分布式数据集

    *Represents an immutable(不可变)partitioned(分区的)collection of elements that can be operated on in parallel(并行操作). 

    * 操作包括: `map`, `filter`, and `persist`(持久化)

    *有KV,Double,Sequence文件类型

    什么是不可变?——>对于变量不可修改,每次操作之后都会生成一个新的变量。

    spark编程和scala类似,可以无缝对接

    ```

    abstract class RDD[T: ClassTag](

        @transient private var _sc: SparkContext,

        @transient private var deps: Seq[Dependency[_]]

      ) extends Serializable with Logging

    ```

    1)抽象类,只有在子类里面实现了(子类继承父类)

    2)带泛型的,可以支持多种类型(string,person,user)

    单机存储/计算==>分布式存储/计算

    1数据存储:切割  HDFS BLock

    2数据计算:切割(并行计算)Mapreduce、spark

    3存储和计算:HDFS/S3 + Mapreduce/Spark

    # 2 RDD特性

    【面试考点】Internally, each RDD is characterized by five main properties:

    - A list of partitions一些列分区分片

    - A function for computing each split可以去计算每个分区分片

    rdd.map(_+1)->对每个分区做了一个相同的操作

    - A list of dependencies on other RDDs rdd是具有依赖关系的【重要特点】

    rdd1==>rdd2==>rdd3==>=rdd4

    rdd1有五个分区,则rdd2也有五个分区。如果rdd2第四个分区丢失,则数据会从rdd1第四个分区再次计算,而不会对所有数据进行计算。

    - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)。可选属性,KV分区可以按照不同策略去设置分区hash or range.etc

       - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

         an HDFS file)可选属性,数据在哪儿可以把作业调度在数据所在的节点进行计算:数据移动不如移动作业

    用户可自己创建RDD,从文件中读取数据

    * refer课程:hadoop入门实战课程

    # 3 RDD特性在源码中的体现

    RDD有五个方法:

    ## 1 computer-特性2

    > DeveloperApi ——开发API

    > Implemented(实现)by subclasses(子类)to compute a given partition.

    ```

    def compute(split: Partition, context: TaskContext): Iterator[T]

    ```

    * parition源码

    ```

    package org.apache.spark

    // An identifier for a partition in an RDD.

    trait Partition extends Serializable {

      //Get the partition's index within its parent RDD

      def index: Int

      // A better default implementation of HashCode

      override def hashCode(): Int = index

      override def equals(other: Any): Boolean = super.equals(other)

    }

    ```

    面试问题:

    1为什么重写equals方法的时候需要重写hashcode

    2如何实现hashset/hashmap

    ## 2 getPartitions-特性1

    ```

     protected def getPartitions: Array[Partition]

    ```

    This method will only be called once, so it is safe to implement a time-consuming computation in it.

    ## 3 getDependencies-特性3

    ```

     protected def getDependencies: Seq[Dependency[_]] = deps

    ```

    返回一堆依赖关系

    ## 4 getPreferredLocations-特性5

    ```

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil

    ```

    ## 5 partitioner-特性4

    ```

    val partitioner: Option[Partitioner] = None

    ```

    ## + hadoop RDD的实现

     :: DeveloperApi ::

    An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).

    protected def getInputFormat(conf: JobConf): InputFormat[K, V]  ——new出来输入对象,然后匹配是否符合输入,然后返回。

    override def getPartitions: Array[Partition] ——调用getinputformat,然后分块,检查是否为空,返回非空。

    对于上面讲的在开发API中这些方法的实现没有理解

    ## + JDBC RDD的实现

    也存在 上述的集中RDD支持的操作

    ---

    两个实现的方式是说每个实现里会有这几种基本方式的实现,但是具怎么实现,实现原理是依赖于其本身的设计还是spark RDD的设计还是不理解。

    logging属于日志接口

    def this是附属构造器

    #  4图解RDD

    # 5 Spark ContextSpark Conf

    1创建SparkContext

    链接到Spark集群,可以跑在local、standalone、yarn、mesos

    可以通过SC创建RDD,或者广播变量到集群

     在创建sparkcontext之前,要创建spark conf

    不要硬编码,最好是通过submit传入进来

    启动pyshark(在python的bin目录里,可以配置环境变量后,支持全局可用)

    ```

    ./pyshark

    ```

    在创建spark的时候初始化了sc,sc的master为local[*]

    可以spark000:8080查看运行状况

    也可以添加其他属性

    ```

    ./bin/pyshark --master local[4] --py-files code.py

    ```

    用ipython启动

    ```

    $ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

    ```

    sc.sparkconf

    # 6 pyshark脚本解析

     可以用 查询相关参数属性

    ./pyshark --help

    传输参数

    ./pyshark --master local[2]

    ./pyshark --name  "Pyskark" #定义名字

    # 7 RDD 创建方式

    Once created, distFile can be acted on by dataset operations.

    ## 方法 1 driver

    Parallelized Collections,测试场景下使用较多

    ```

    data =  [1,2,3,4,5]

    disData = sc.parallelize(data) #把data转换成RDD,默认转换成两个部分

    disData.collect() #查看数据里面具体是什么

    disData.reduce( lambda a , b : a+b ) #加和所有元素(?那对部分相加怎么办?)

    ```

    ps:collect,reduce才会触发jobs

    One important parameter for parallel collections is the number of*partitions*to cut the dataset into.——也就是说,我们的数据是可以切分的。例子如下:

    ```

    disData = sc.parallelize(data,5) #这里的参数5是表示把RDD切成5个部分。

    ```

    多分数据和业务逻辑和处理性能有关。

    每个CPU可以设置2-4个partition

    ## 方法 2 外部数据导入

    外部存储导入

    支持读取的数据源格式:HDFS, HBase,S3,Hive,etc

    支持文件类型:text files, SequenceFiles or any other Hadoop InputFormat or any data source offering a Hadoop InputFormat

    ```

    # 先在目录下创建hello.txt文件,内容可自定义,记录下其村吃地址

    disFile = sc.textFile("file://本地文件路径")

    # 本地文件

    disFile = sc.textFile("file:///home/hadoop/data/hello.txt")

    disFile.collection()

    #hadoop文件

    disFile = sc.textFile("hdfs://hadoop000:8020/hello.txt")

    disFile.collection()

    #使用示例,求和文件中字段的长度并加和

    disFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)

    ```

    *If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.

    注意点:如果你的文件在本地,要确保你的运行程序在运行的节点上可以访问本地。一般建议用网路的共享文件系统(use a network-mounted shared file system,类似于S3)?这就是说 如果在hdfs 上 也算非本地把?

    1)上课环境是单节点,hello.txt本地有就可以读取

    2)如果在standalone:Spark集群上:3个节点,local path都是表示节点本地读取文件。不建议

    3)生产上直接用yarm,不会用standalone

    * 支持整个文件夹,或者压缩文件。

    ```

    sc.textFile("/my/directory")

    sc.textFile("/my/directory/*.gz")

    ```

    * 可以设置分区(partitions)数量,可以设置更多,单无法设置更少。

    ## python特有的支持类型

    *SparkContext.wholeTextFiles

    支持全路径的文件读取,key是文件村存储地址,value是文件内容

    ```

    sc.wholeTextFiles("file:///home/hadoop/data/hello.txt").collect()

    # 读取内容

    [(文件路径,文件内容)]

    ```

    * RDD.saveAsPickleFile——可以保存为这种文件类SparkContext.pickleFile

    * SequenceFile and Hadoop Input/Output Formats

    示例 1 序列

    ```

    rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))

    rdd.saveAsSequenceFile("path/to/file")

    sorted(sc.sequenceFile("path/to/file").collect())

    [(1, u'a'), (2, u'aa'), (3, u'aaa')]

    ```

    示例2

    ```

    $ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar

    >>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults

    >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",

                                 "org.apache.hadoop.io.NullWritable",

                                 "org.elasticsearch.hadoop.mr.LinkedMapWritable",

                                 conf=conf)

    >>> rdd.first()  # the result is a MapWritable that is converted to a Python dict

    (u'Elasticsearch ID',

     {u'field1': True,

      u'field2': u'Some Text',

      u'field3': 12345})

    ```

    不知道什么是es

    示例3 存储数据

    ```

    data =  [1,2,3,4,5]

    disData = sc.parallelize(data)

    disData.saveAsTextFile('file:///home/hadoop/data/output/)

    ```

    # 8 Spark 应用程序开发并运行

    IDE:IDEA pycharm

    1 建立一个新的项目

    2 建立一个py文件

    3对py文件的configureations中添加环境变量

    添加spark中pyhton的环境变量

    添加spark本身的环境变量

    4在preference里面的project structure的add content root添加python-lib下的py4j和pyshark两个zip包

    5使用local进行本地测试

    6提交pyshark程序

    ```

     ./spark - submit --master local[2]  -name spark -py python文件名

    ```

    # 9 程序运行coding小笔记

    显示当前全部java进程pid的命令jps 

    创建hello.txt并对其进行编辑vi hello.txt 

    添加环境变量:vi ~/.bashrc

    查看一个文本文件cat hello.txt

    hdfs文件传输

    ```

    hadoop fs -put /hello.txt

    hadoop fs -text /hello.txt

    ```

    ?不懂什么时候需要/ 什么时候~/ 什么时候./

    refer课程:10个小时入门hadoop

    相关文章

      网友评论

          本文标题:spark chapter 3 RDD

          本文链接:https://www.haomeiwen.com/subject/gebrjctx.html