美文网首页大数据云计算
Spark单词统计笔记

Spark单词统计笔记

作者: dravenxiaokai | 来源:发表于2018-10-30 16:54 被阅读0次

    1.sc
    SparkContext,Spark程序的入口点,封装了整个spark运行环境的信息。
    2.进入spark-shell

    $>spark-shell
    $scala>sc
    

    API:
    SparkContext
    RDD:
    resilient distributed dataset,弹性分布式数据集。等价于集合。
    spark实现Wordcount

    //加载文本文件,以换行符方式切割文本。Array(hello world2,hello world2,...)
    val  rdd1 = sc.textFile("/home/ubuntu/test.txt");
    val rdd2 = rdd1.flatMap(line=>line.split(" "));
    val rdd3 = rdd2.map(word=>(word,1));
    val rdd4 = rdd3.reduceByKey(_+_);
    rdd4.collect
    一行代码:
    scala> sc.textFile("/home/ubuntu/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    结果:
    res2: Array[(String, Int)] = Array((world2,2), (world4,1), (hello,4), (world3,1))
    过滤包含“wor”的单词
    scala> sc.textFile("/home/ubuntu/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_+_).collect
    res3: Array[(String, Int)] = Array((world2,2), (world4,1), (world3,1))
    

    windows下:
    idea编写Scala程序,引入spark类库,完成wordcount
    1.添加Scala框架支持,没有则安装Scala插件(2.11.8),spark最新版本2.3.2(scala2.11.8)
    2.maven添加spark依赖

     <dependency>
        <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        <version>2.3.2</version>
     </dependency>
    

    Scala版本

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * scala版本
      */
    object WordCountScala {
      def main(args: Array[String]): Unit = {
        //创建spark配置对象
        val conf = new SparkConf();
        //conf.setAppName("WordCountScala");
        //设置master属性
        //conf.setMaster("local");
        //通过conf创建sc
        val sc = new SparkContext(conf);
    
        //加载文本文件
    //    val rdd1 = sc.textFile("d:/scala/test.txt");
        val rdd1 = sc.textFile(args(0));
        //压扁
        val rdd2 = rdd1.flatMap(line => line.split("\\s+"));
        //映射w=>(w,1)
        val rdd3 = rdd2.map((_, 1))
        val rdd4 = rdd3.reduceByKey(_ + _)
        val r = rdd4.collect()
        r.foreach(println)
    
      }
    }
    

    java版本

    package com.it18zhang.spark.java;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * java版本
     */
    public class WordCountJava2 {
        public static void main(String[] args) {
            //创建SparkConf对象
            SparkConf conf = new SparkConf();
            //conf.setAppName("WordCountJava2");
            //conf.setMaster("local");
    
            //上下文
            JavaSparkContext sc = new JavaSparkContext(conf);
            //加载文本文件
    //        JavaRDD<String> rdd1 = sc.textFile("d:/scala/test.txt");
            JavaRDD<String> rdd1 = sc.textFile(args[0]);
            //接口回调机制产生匿名内部类对象
            JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
                public Iterator<String> call(String s) throws Exception {
                    List<String> list = new ArrayList<String>();
                    String[] arr = s.split("\\s+");
                    for (String ss:arr){
                        list.add(ss);
                    }
                    return list.iterator();
                }
            });
            //映射,word=>(word,1)
            JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            //reduce化简
            JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            List<Tuple2<String, Integer>> list = rdd4.collect();
            for (Tuple2 t:list){
                System.out.println(t._1() + ":" + t._2());
            }
        }
    }
    

    打包成 SparkDemo1-1.0-SNAPSHOT.jar

    spark-submit --master local --class com.it18zhang.spark.scala.WordCountScala --name MyWordCount SparkDemo1-1.0-SNAPSHOT.jar /home/ubuntu/test.txt 
    

    Spark集群模式

    1.local
      nothing!
      spark-shell --master local;  //默认
    2.standalone
      独立模式
    a.复制spark目录到其他主机
    b.配置其他主机的所有环境变量
      [/etc/profile]
      SPARK_HOME
      PATH
    c.配置master节点的slaves
      s1
      s2
      s3
    d.启动spark集群
    /soft/spark/sbin/start-all.sh
    e.webui
      http://s0:8080/
    

    提交作业到完全分布式spark集群

    1.需要启动hadoop集群(只需要hdfs)
      start-hdfs.sh
    2.put文件到hdfs
      hdfs dfs -put test.txt /user/ubuntu
    3.运行spark-submit
    spark-submit --master spark://s0:7077 --class com.it18zhang.spark.scala.WordCountScala --name MyWordCount SparkDemo1-1.0-SNAPSHOT.jar hdfs://s0:8020/user/ubuntu/test.txt 
    
    ubuntu@s0:~$ xcall.sh jps
    ============ s0 jps =============
    3207 NameNode
    4504 Jps
    3432 SecondaryNameNode
    3976 Master
    ============ s1 jps =============
    3522 Worker
    3845 Jps
    3276 DataNode
    ============ s2 jps =============
    3827 Jps
    3276 DataNode
    3517 Worker
    ============ s3 jps =============
    3197 DataNode
    3758 Jps
    3439 Worker
    
    脚本分析
    [start-all.sh]
      sbin/spark-config.sh
      sbin/spark-master.sh  //启动master进程
      sbin/spark-slaves.sh  //启动worker进程
    
    
    webui

    相关文章

      网友评论

        本文标题:Spark单词统计笔记

        本文链接:https://www.haomeiwen.com/subject/zqxktqtx.html