美文网首页
(1)sparkstreaming结合sparksql读取soc

(1)sparkstreaming结合sparksql读取soc

作者: NBI大数据可视化分析 | 来源:发表于2022-08-31 10:59 被阅读0次

    Spark Streaming是构建在Spark Core的RDD基础之上的,与此同时Spark Streaming引入了一个新的概念:DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。DStream抽象是Spark Streaming的流处理模型,在内部实现上,Spark Streaming会对输入数据按照时间间隔(如1秒)分段,每一段数据转换为Spark中的RDD,这些分段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。
    Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。Spark SQL 的前身是Shark,Shark是基于 Hive 所开发的工具,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在 Spark 引擎上。


    1.png

    (1)pom依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.66</version>
        </dependency>
    </dependencies>
    

    (2)定义消息对象

    package com.pojo;
    
    import java.io.Serializable;
    import java.util.Date;
    
    /**
     * Created by lj on 2022-07-13.
     */
    public class WaterSensor implements Serializable {
        public String id;
        public long ts;
        public int vc;
    
        public WaterSensor(){
    
        }
    
        public WaterSensor(String id,long ts,int vc){
            this.id = id;
            this.ts = ts;
            this.vc = vc;
        }
    
        public int getVc() {
            return vc;
        }
    
        public void setVc(int vc) {
            this.vc = vc;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public long getTs() {
            return ts;
        }
    
        public void setTs(long ts) {
            this.ts = ts;
        }
    }
    

    (3)构建数据生产者

    package com.producers;
    
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Random;
    
    /**
     * Created by lj on 2022-07-12.
     */
    public class Socket_Producer {
        public static void main(String[] args) throws IOException {
    
            try {
                ServerSocket ss = new ServerSocket(9999);
                System.out.println("启动 server ....");
                Socket s = ss.accept();
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                String response = "java,1,2";
    
                //每 2s 发送一次消息
                int i = 0;
                Random r=new Random();   //不传入种子
                String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
    
                while(true){
                    response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n";
                    System.out.println(response);
                    try{
                        bw.write(response);
                        bw.flush();
                        i++;
                    }catch (Exception ex){
                        System.out.println(ex.getMessage());
                    }
                    Thread.sleep(1000 * 30);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    (4)通过sparkstreaming接入socket数据源,sparksql计算结果打印输出:

    package com.examples;
    
    import com.pojo.WaterSensor;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction2;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.Time;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    /**
     * Created by lj on 2022-07-16.
     */
    public class SparkSql_Socket1 {
        private static String appName = "spark.streaming.demo";
        private static String master = "local[*]";
        private static String host = "localhost";
        private static int port = 9999;
    
        public static void main(String[] args) {
            //初始化sparkConf
            SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
    
            //获得JavaStreamingContext
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
    
            //从socket源获取数据
            JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
    
            //将 DStream 转换成 DataFrame 并且运行sql查询
            lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
                @Override
                public void call(JavaRDD<String> rdd, Time time) {
                    SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    
                    //通过反射将RDD转换为DataFrame
                    JavaRDD<WaterSensor> rowRDD = rdd.map(new Function<String, WaterSensor>() {
                        @Override
                        public WaterSensor call(String line) {
                            String[] cols = line.split(",");
                            WaterSensor waterSensor = new WaterSensor(cols[0],Long.parseLong(cols[1]),Integer.parseInt(cols[2]));
                            return waterSensor;
                        }
                    });
    
                    Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, WaterSensor.class);
                    // 创建临时表
                    dataFrame.createOrReplaceTempView("log");
                    Dataset<Row> result = spark.sql("select * from log");
                    System.out.println("========= " + time + "=========");
                    //输出前20条数据
                    result.show();
                }
            });
    
            //开始作业
            ssc.start();
            try {
                ssc.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                ssc.close();
            }
        }
    }
    

    (5)效果演示:


    2.png

    代码中定义的是1分钟的批处理间隔,所以每1分钟会触发一次计算:


    3.png

    相关文章

      网友评论

          本文标题:(1)sparkstreaming结合sparksql读取soc

          本文链接:https://www.haomeiwen.com/subject/nehdnrtx.html