美文网首页好程序员大数据
好程序员大数据教程:SparkShell和IDEA中编写Spar

好程序员大数据教程:SparkShell和IDEA中编写Spar

作者: ab6973df9221 | 来源:发表于2019-05-28 15:11 被阅读1次

    好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spark程序。spark-shell程序一般用作Spark程序测试练习来用。spark-shell属于Spark的特殊应用程序,我们可以在这个特殊的应用程序中提交应用程序

    spark-shell启动有两种模式,local模式和cluster模式,分别为

    local模式:

    spark-shell

    local模式仅在本机启动一个SparkSubmit进程,没有与集群建立联系,虽然进程中有SparkSubmit但是不会被提交到集群红

    Cluster模式(集群模式):

    spark-shell \--master spark://hadoop01:7077 \--executor-memory 512m \--total-executor-cores 1后两个命令不是必须的--master这条命令是必须的(除非在jar包中已经指可以不指定,不然就必须指定)

    退出shell

    千万不要ctrl+c spark-shell 正确退出 :quit 千万不要ctrl+c退出 这样是错误的 若使用了ctrl+c退出 使用命令查看监听端口 netstat - apn | grep 4040 在使用kill -9 端口号 杀死即可

    3.25.11 spark2.2shell和spark1.6shell对比

    ps:启动spark-shell若是集群模式,在webUI会有一个一直执行的任务

    通过IDEA创建Spark工程

    ps:工程创建之前步骤省略,在scala中已经讲解,直接默认是创建好工程的

    对工程中的pom.xml文件配置

     <properties>        <maven.compiler.source>1.8</maven.compiler.source>        <maven.compiler.target>1.8</maven.compiler.target>        <encoding>UTF-8</encoding>        <scala.version>2.11.8</scala.version>        <spark.version>2.2.0</spark.version>        <hadoop.version>2.7.1</hadoop.version>        <scala.compat.version>2.11</scala.compat.version>    </properties>    <dependencies>        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>${scala.version}</version>        </dependency>        <dependency>        <groupId>org.apache.spark</groupId>        <artifactId>spark-core_2.11</artifactId>        <version>${spark.version}</version>    </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-client</artifactId>            <version>${hadoop.version}</version>        </dependency>    </dependencies> -->

    Spark实现WordCount程序


    Scala版本import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object SparkWordCount {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")    //创建sparkContext对象    val sc =  new SparkContext(conf)    //通过sparkcontext对象就可以处理数据    //读取文件 参数是一个String类型的字符串 传入的是路径    val lines: RDD[String] = sc.textFile(“dir/wordcount”)    //切分数据    val words: RDD[String] = lines.flatMap(_.split(" "))    //将每一个单词生成元组 (单词,1)    val tuples: RDD[(String, Int)] = words.map((_,1))    //spark中提供一个算子 reduceByKey 相同key 为一组进行求和 计算value    val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)    //对当前这个结果进行排序 sortBy 和scala中sotrBy是不一样的 多了一个参数    //默认是升序  false就是降序    val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)    //将数据提交到集群存储 无法返回值     sorted.foreach(println)    //回收资源停止sc,结束任务    sc.stop()  }}


    Java版本

    import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;import java.util.List; public class JavaWordCount {    public static void main(String[] args) {//1.先创建conf对象进行配置主要是设置名称,为了设置运行模式        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");//2.创建context对象        JavaSparkContext jsc = new JavaSparkContext(conf);        JavaRDD<String> lines = jsc.textFile("dir/file");//进行切分数据 flatMapFunction是具体实现类        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {            @Override            public Iterator<String> call(String s) throws Exception {                List<String> splited = Arrays.asList(s.split(" "));                return splited.iterator();            }        });//将数据生成元组//第一个泛型是输入的数据类型 后两个参数是输出参数元组的数据        JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String,                Integer>() {            @Override            public Tuple2<String, Integer> call(String s) throws Exception {                return new Tuple2<String, Integer>(s, 1);            }        });//聚合        JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer,                Integer>() {            @Override//第一个Integer是相同key对应的value//第二个Integer是相同key 对应的value            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });//因为Java api没有提供sortBy算子,此时需要将元组中的数据进行位置调换,然后在排序,排完序在换回//第一次交换是为了排序        JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String,                Integer>, Integer, String>() {            @Override            public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {                return tup.swap();            }        });//排序        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);//第二次交换是为了最终结果 <单词,数量>        JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer,                String>, String, Integer>() {            @Override            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception            {                return tuple2.swap();            }        });        System.out.println(res.collect());        res.saveAsTextFile("out1");        jsc.stop();    }}

    相关文章

      网友评论

        本文标题:好程序员大数据教程:SparkShell和IDEA中编写Spar

        本文链接:https://www.haomeiwen.com/subject/gdrdtctx.html