美文网首页
hadoop-spark 大数据处理技巧章节一(下)

hadoop-spark 大数据处理技巧章节一(下)

作者: Kean_L_C | 来源:发表于2019-04-15 23:06 被阅读0次

    上一章节讲解了如何使用java以及hadoop进行二次排序,这一章节分别尝试java、scala与spark结合实现二次排序。

    spark启动

    为了启动spark方便一点这里写了简单的脚本文本

    [root@master Templates]# cat stop_spark_yarn.sh 
    # stop hadoop yarn spark
    $SPARK_HOME/sbin/stop-all.sh
    $HADOOP_HOME/sbin/stop-all.sh
    [root@master Templates]# cat start_spark_yarn.sh 
    # start hadoop yarn spark
    $HADOOP_HOME/sbin/start-all.sh
    $SPARK_HOME/sbin/start-all.sh
    [root@master Templates]# chmod 777 ../Templates/*
    

    启动后

    image.png

    maven依赖

    spark使用scala语言的,这里为了能让java能在spark中跑起来,需要添加些maven依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>kean.learn</groupId>
        <artifactId>hadoop_spark</artifactId>
        <version>1.0-SNAPSHOT</version>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
                <!--<plugin>-->
                    <!--<groupId>org.scala-tools</groupId>-->
                    <!--<artifactId>maven-scala-plugin</artifactId>-->
                    <!--<version>2.15.2</version>-->
                    <!--<executions>-->
                        <!--<execution>-->
                            <!--<goals>-->
                                <!--<goal>compile</goal>-->
                                <!--<goal>testCompile</goal>-->
                            <!--</goals>-->
                        <!--</execution>-->
                    <!--</executions>-->
                <!--</plugin>-->
            </plugins>
        </build>
    
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.8</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-compiler</artifactId>
                <version>2.11.8</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-reflect</artifactId>
                <version>2.11.8</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math3 -->
            <!--<dependency>-->
                <!--<groupId>org.apache.commons</groupId>-->
                <!--<artifactId>commons-math3</artifactId>-->
                <!--<version>3.6.1</version>-->
            <!--</dependency>-->
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-math</artifactId>
                <version>2.2</version>
            </dependency>
    
    
    
        </dependencies>
    
    
        <repositories>
            <!-- 代码库 -->
            <repository>
                <id>maven-ali</id>
                <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                    <updatePolicy>always</updatePolicy>
                    <checksumPolicy>fail</checksumPolicy>
                </snapshots>
            </repository>
        </repositories>
    </project>
    

    运行代码直接使用大数据处理技巧书中的代码如下:

    package org.dataalgorithms.chap01.sparkwithlambda;
    
    // STEP-0: import required Java/Spark classes.
    
    import java.util.List;
    import java.util.SortedMap;
    import java.util.TreeMap;
    //
    import scala.Tuple2;
    //
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    //
    import org.dataalgorithms.util.SparkUtil;
    import org.dataalgorithms.util.DataStructures;
    
    
    /**
     * SecondarySortUsingCombineByKey class implements the secondary sort design pattern
     * by using combineByKey().
     * <p>
     * <p>
     * Input:
     * <p>
     * name, time, value
     * x,2,9
     * y,2,5
     * x,1,3
     * y,1,7
     * y,3,1
     * x,3,6
     * z,1,4
     * z,2,8
     * z,3,7
     * z,4,0
     * p,1,10
     * p,3,60
     * p,4,40
     * p,6,20
     * <p>
     * Output: generate a time-series looking like this:
     * <p>
     * t1   t2   t3   t4  t5     t6
     * x => [3,  9,   6]
     * y => [7,  5,   1]
     * z => [4,  8,   7,   0]
     * p => [10, null, 60, 40, null , 20]
     * <p>
     * x => [(1,3), (2,9), (3,6)]            where 1 < 2 < 3
     * y => [(1,7), (2,5), (3,1)]            where 1 < 2 < 3
     * z => [(1,4), (2,8), (3,7), (4,0)]     where 1 < 2 < 3 < 4
     * p => [(1,10), (3,60), (4,40), (6,20)] where 1 < 3 < 4 < 6
     *
     * @author Mahmoud Parsian
     */
    public class SecondarySortUsingCombineByKey {
    
        public static void main(String[] args) throws Exception {
    
            // STEP-1: read input parameters and validate them
            if (args.length < 2) {
                System.err.println("Usage: SecondarySortUsingCombineByKey <input> <output>");
                System.exit(1);
            }
            String inputPath = args[0];
            System.out.println("inputPath=" + inputPath);
            String outputPath = args[1];
            System.out.println("outputPath=" + outputPath);
    
            // STEP-2: Connect to the Sark master by creating JavaSparkContext object
            final JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
    
            // STEP-3: Use ctx to create JavaRDD<String>
            //  input record format: <name><,><time><,><value>
            JavaRDD<String> lines = ctx.textFile(inputPath, 1);
    
            // STEP-4: create (key, value) pairs from JavaRDD<String> where
            // key is the {name} and value is a pair of (time, value).
            // The resulting RDD will be JavaPairRDD<String, Tuple2<Integer, Integer>>.    
            // convert each record into Tuple2(name, time, value)
            // PairFunction<T, K, V>    T => Tuple2(K, V) where K=String and V=Tuple2<Integer, Integer>
            System.out.println("===  DEBUG STEP-4 ===");
            // 获取读取行数据利用Tuple2进行保存
            JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = lines.mapToPair((String s) -> {
                String[] tokens = s.split(","); // x,2,5
                // System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
                Tuple2<Integer, Integer> timevalue = new Tuple2<>(Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2]));
                return new Tuple2<>(tokens[0], timevalue);
            });
    
            // STEP-5: validate STEP-4, we collect all values from JavaPairRDD<> and print it.    
            // List<Tuple2<String, Tuple2<Integer, Integer>>> output = pairs.collect();
            // for (Tuple2 t : output) {
            //     Tuple2<Integer, Integer> timevalue = (Tuple2<Integer, Integer>) t._2;
            //     System.out.println(t._1 + "," + timevalue._1 + "," + timevalue._2);
            // }
    
            // How to use combineByKey(): to use combineByKey(), you 
            // need to define 3 basic functions f1, f2, f3:
            // and then you invoke it as: combineByKey(f1, f2, f3)
            //    function 1: create a combiner data structure 
            //    function 2: merge a value into a combined data structure
            //    function 3: merge two combiner data structures
    
    
            // function 1: create a combiner data structure         
            // Here, the combiner data structure is a SortedMap<Integer,Integer>,
            // which keeps track of (time, value) for a given key
            // Tuple2<Integer, Integer> = Tuple2<time, value>
            // SortedMap<Integer, Integer> = SortedMap<time, value>  默认按键值升序排列
            Function<Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> createCombiner = (Tuple2<Integer, Integer> x)
                    -> {
                Integer time = x._1;
                Integer value = x._2;
                SortedMap<Integer, Integer> map = new TreeMap<>();
                map.put(time, value);
                return map;
            };
    
            // function 2: merge a value into a combined data structure
            Function2<SortedMap<Integer, Integer>, Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> mergeValue
                    = (SortedMap<Integer, Integer> map, Tuple2<Integer, Integer> x) -> {
                Integer time = x._1;
                Integer value = x._2;
                map.put(time, value);
                return map;
            };
    
            // function 3: merge two combiner data structures
            Function2<SortedMap<Integer, Integer>, SortedMap<Integer, Integer>, SortedMap<Integer, Integer>> mergeCombiners
                    = (SortedMap<Integer, Integer> map1, SortedMap<Integer, Integer> map2) -> {
                if (map1.size() < map2.size()) {
                    return DataStructures.merge(map1, map2);
                } else {
                    return DataStructures.merge(map1, map2);
                }
            };
    
            // STEP-5: create sorted (time, value)
            JavaPairRDD<String, SortedMap<Integer, Integer>> combined = pairs.combineByKey(
                    createCombiner,
                    mergeValue,
                    mergeCombiners);
    
            // STEP-7: validate STEP-6, we collect all values from JavaPairRDD<> and print it.    
            // System.out.println("===  DEBUG STEP-6 ===");
            // List<Tuple2<String, SortedMap<Integer, Integer>>> output2 = combined.collect();
            // for (Tuple2<String, SortedMap<Integer, Integer>> t : output2) {
            //     String name = t._1;
            //     SortedMap<Integer, Integer> map = t._2;
            //     System.out.println(name);
            //     System.out.println(map);
            // }
    
            // persist output
            combined.saveAsTextFile(outputPath);
    
            // done!
            ctx.close();
    
            // exit
            System.exit(0);
        }
    
    }
    

    spark集群提交任务

    [root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/input2/timeseries.txt
     x,2,9
     y,2,5
     x,1,3
     y,1,7
     y,3,1
     x,3,6
     z,1,4
     z,2,8
     z,3,7
     z,4,0
     p,1,10
     p,3,60
     p,4,40
     p,6,20
    [root@master bin]# ./spark-submit --master local[2]  --class org.dataalgorithms.chap01.sparkwithlambda.SecondarySortUsingCombineByKey /root/Data/data_algorithms/chapter1/hadoop_spark-1.0-SNAPSHOT.jar /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
    [root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
    ( p,{1=10, 3=60, 4=40, 6=20})
    ( x,{1=3, 2=9, 3=6})
    ( y,{1=7, 2=5, 3=1})
    ( z,{1=4, 2=8, 3=7, 4=0})
    
    

    代码很好理解,如果懂一点scala的tuple集合就更好理解,用java写spark感觉怪怪的,有点像混合编程,以前看过一点scala知识,索性再次结合spark学习下scala,这次希望不是从入门到放弃。

    scala

    build.sbt

    name := "hadoop_spark_scala"
    
    version := "0.1"
    
    scalaVersion := "2.11.8"
    
    //resolvers ++= Seq( //额外仓库添加
    //  "Admonitor Repository" at "http://maven.mzsvn.com/repository/admonitor",
    //  "Local Maven Repository" at "local-maven:file://D:/java_workspace/repository"
    //)
    
    libraryDependencies ++= Seq( //依赖库
      "org.apache.spark" % "spark-core_2.10" % "1.6.0",
      "org.apache.hadoop" % "hadoop-common" % "2.7.3",
      "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "2.7.3"
    )
    
    package org.dataalgorithms.chap01.scala
    
    import org.apache.spark.Partitioner
    
    
    /**
      * A custom partitioner
      *
      * org.apache.spark.Partitioner:
      * An abstract class that defines how the elements in a
      * key-value pair RDD are partitioned by key. Maps each
      * key to a partition ID, from 0 to numPartitions - 1.
      */
    class CustomPartitioner(partitions: Int) extends Partitioner {
    
        require(partitions > 0, s"Number of partitions ($partitions) cannot be negative.")
    
        def numPartitions: Int = partitions
    
        def getPartition(key: Any): Int = key match {
            case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)
            case null => 0
            case _ => math.abs(key.hashCode % numPartitions)
        }
    
        override def equals(other: Any): Boolean = other match {
            case h: CustomPartitioner => h.numPartitions == numPartitions
            case _ => false
        }
    
        override def hashCode: Int = numPartitions
    }
    
    package org.dataalgorithms.chap01.scala
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    /**
      * Spark/Scala solution to secondary sort
      *
      * @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
      * @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
      *
      */
    object SecondarySort {
    
        def main(args: Array[String]): Unit = {
            //
            if (args.length != 3) {
                println("Usage <number-of-partitions> <input-path> <output-path>")
                sys.exit(1)
            }
    
            val partitions = args(0).toInt
            val inputPath = args(1)
            val outputPath = args(2)
    
            val config = new SparkConf
            config.setAppName("SecondarySort")
            val sc = new SparkContext(config)
    
            val input = sc.textFile(inputPath)
    
            //------------------------------------------------
            // each input line/record has the following format:
            // <id><,><time><,><value>
            //-------------------------------------------------
            val valueToKey = input.map(x => {
                val line = x.split(",")
                ((line(0) + "-" + line(1), line(2).toInt), line(2).toInt)
            })
    
            implicit def tupleOrderingDesc = new Ordering[Tuple2[String, Int]] {
                override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]): Int = {
                    if (y._1.compare(x._1) == 0) y._2.compare(x._2)
                    else y._1.compare(x._1)
                }
            }
    
            val sorted = valueToKey.repartitionAndSortWithinPartitions(new CustomPartitioner(partitions))
    
            val result = sorted.map {
                case (k, v) => (k._1, v)
            }
    
            result.saveAsTextFile(outputPath)
    
            // done
            sc.stop()
        }
    }
    

    打包成jar提交集群

    [root@master bin]# ./spark-submit --master local[2]  --class org.dataalgorithms.chap01.scala.SecondarySort /root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar 3 /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
    19/04/18 00:07:42 INFO spark.SparkContext: Running Spark version 2.2.1
    19/04/18 00:07:43 INFO spark.SparkContext: Submitted application: SecondarySort
    19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls to: root
    19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls to: root
    19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls groups to: 
    19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls groups to: 
    19/04/18 00:07:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    19/04/18 00:07:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 39070.
    19/04/18 00:07:44 INFO spark.SparkEnv: Registering MapOutputTracker
    19/04/18 00:07:44 INFO spark.SparkEnv: Registering BlockManagerMaster
    19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    19/04/18 00:07:44 INFO storage.DiskBlockManager: Created local directory at /opt/spark-2.2.1-bin-hadoop2.7/blockmgr-72b30be5-8b8a-4589-90da-20596018bfff
    19/04/18 00:07:44 INFO memory.MemoryStore: MemoryStore started with capacity 93.3 MB
    19/04/18 00:07:44 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    19/04/18 00:07:44 INFO util.log: Logging initialized @2833ms
    19/04/18 00:07:44 INFO server.Server: jetty-9.3.z-SNAPSHOT
    19/04/18 00:07:44 INFO server.Server: Started @2926ms
    19/04/18 00:07:44 INFO server.AbstractConnector: Started ServerConnector@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    19/04/18 00:07:44 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@120f38e6{/jobs,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@fac80{/jobs/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@649f2009{/jobs/job,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@797501a{/jobs/job/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57f791c6{/stages,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c4f9535{/stages/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30c31dd7{/stages/stage,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@241a53ef{/stages/stage/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2db2cd5{/stages/pool,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@615f972{/stages/pool/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73393584{/storage,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1827a871{/storage/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7249dadf{/storage/rdd,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@66238be2{/storage/rdd/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@200606de{/environment,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@f8908f6{/environment/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2ef8a8c3{/executors,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@63fd4873{/executors/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7544a1e4{/executors/threadDump,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7957dc72{/executors/threadDump/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3aacf32a{/static,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3d4d3fe7{/,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51684e4a{/api,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c451c9c{/jobs/job/kill,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@372b0d86{/stages/stage/kill,null,AVAILABLE,@Spark}
    19/04/18 00:07:44 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.21.220:4040
    19/04/18 00:07:44 INFO spark.SparkContext: Added JAR file:/root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar at spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
    19/04/18 00:07:45 INFO executor.Executor: Starting executor ID driver on host localhost
    19/04/18 00:07:45 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43710.
    19/04/18 00:07:45 INFO netty.NettyBlockTransferService: Server created on 172.16.21.220:43710
    19/04/18 00:07:45 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
    19/04/18 00:07:45 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.16.21.220:43710 with 93.3 MB RAM, BlockManagerId(driver, 172.16.21.220, 43710, None)
    19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
    19/04/18 00:07:45 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.21.220, 43710, None)
    19/04/18 00:07:45 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4d8539de{/metrics/json,null,AVAILABLE,@Spark}
    19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 240.0 KB, free 93.1 MB)
    19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.2 KB, free 93.0 MB)
    19/04/18 00:07:46 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.16.21.220:43710 (size: 23.2 KB, free: 93.3 MB)
    19/04/18 00:07:46 INFO spark.SparkContext: Created broadcast 0 from textFile at SecondarySort.scala:30
    19/04/18 00:07:47 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
    19/04/18 00:07:47 INFO spark.SparkContext: Starting job: saveAsTextFile at SecondarySort.scala:54
    19/04/18 00:07:47 INFO mapred.FileInputFormat: Total input paths to process : 1
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Registering RDD 2 (map at SecondarySort.scala:36)
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SecondarySort.scala:54) with 3 output partitions
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54)
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36), which has no missing parents
    19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 93.0 MB)
    19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 93.0 MB)
    19/04/18 00:07:47 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.16.21.220:43710 (size: 2.4 KB, free: 93.3 MB)
    19/04/18 00:07:47 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
    19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36) (first 15 tasks are for partitions Vector(0, 1))
    19/04/18 00:07:47 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
    19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4873 bytes)
    19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4873 bytes)
    19/04/18 00:07:47 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
    19/04/18 00:07:47 INFO executor.Executor: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
    19/04/18 00:07:47 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
    19/04/18 00:07:47 INFO client.TransportClientFactory: Successfully created connection to /172.16.21.220:39070 after 29 ms (0 ms spent in bootstraps)
    19/04/18 00:07:47 INFO util.Utils: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar to /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/fetchFileTemp2679433532910837467.tmp
    19/04/18 00:07:48 INFO executor.Executor: Adding file:/opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/hadoop_spark_scala.jar to class loader
    19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:0+51
    19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:51+51
    19/04/18 00:07:48 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1070 bytes result sent to driver
    19/04/18 00:07:48 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1070 bytes result sent to driver
    19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 802 ms on localhost (executor driver) (1/2)
    19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 801 ms on localhost (executor driver) (2/2)
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at SecondarySort.scala:36) finished in 0.836 s
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: looking for newly runnable stages
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: running: Set()
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: failed: Set()
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54), which has no missing parents
    19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 71.9 KB, free 93.0 MB)
    19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.0 KB, free 92.9 MB)
    19/04/18 00:07:48 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.16.21.220:43710 (size: 26.0 KB, free: 93.2 MB)
    19/04/18 00:07:48 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
    19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54) (first 15 tasks are for partitions Vector(0, 1, 2))
    19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
    19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)
    19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)
    19/04/18 00:07:48 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2)
    19/04/18 00:07:48 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
    19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
    19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 12 ms
    19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
    19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
    19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000001_3' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000001
    19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000001_3: Committed
    19/04/18 00:07:49 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1224 bytes result sent to driver
    19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000000_2' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000000
    19/04/18 00:07:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, localhost, executor driver, partition 2, ANY, 4621 bytes)
    19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000000_2: Committed
    19/04/18 00:07:49 INFO executor.Executor: Running task 2.0 in stage 1.0 (TID 4)
    19/04/18 00:07:49 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1181 bytes result sent to driver
    19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 849 ms on localhost (executor driver) (1/3)
    19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 855 ms on localhost (executor driver) (2/3)
    19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
    19/04/18 00:07:49 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
    19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000002_4' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000002
    19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000002_4: Committed
    19/04/18 00:07:49 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 4). 1181 bytes result sent to driver
    19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 514 ms on localhost (executor driver) (3/3)
    19/04/18 00:07:49 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
    19/04/18 00:07:49 INFO scheduler.DAGScheduler: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54) finished in 1.359 s
    19/04/18 00:07:49 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SecondarySort.scala:54, took 2.695980 s
    19/04/18 00:07:50 INFO server.AbstractConnector: Stopped Spark@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    19/04/18 00:07:50 INFO ui.SparkUI: Stopped Spark web UI at http://172.16.21.220:4040
    19/04/18 00:07:50 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    19/04/18 00:07:50 INFO memory.MemoryStore: MemoryStore cleared
    19/04/18 00:07:50 INFO storage.BlockManager: BlockManager stopped
    19/04/18 00:07:50 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    19/04/18 00:07:50 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    19/04/18 00:07:50 INFO spark.SparkContext: Successfully stopped SparkContext
    19/04/18 00:07:50 INFO util.ShutdownHookManager: Shutdown hook called
    19/04/18 00:07:50 INFO util.ShutdownHookManager: Deleting directory /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600
    [root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
    ( z-2,8)
    ( y-3,1)
    ( x-1,3)
    ( p-6,20)
    ( p-3,60)
    ( z-3,7)
    ( y-1,7)
    ( x-2,9)
    ( p-4,40)
    ( p-1,10)
    ( z-4,0)
    ( z-1,4)
    ( y-2,5)
    ( x-3,6)
    
    

    相关文章

      网友评论

          本文标题:hadoop-spark 大数据处理技巧章节一(下)

          本文链接:https://www.haomeiwen.com/subject/rilewqtx.html