美文网首页
输入DStream之基础数据源

输入DStream之基础数据源

作者: 一个人一匹马 | 来源:发表于2019-02-23 17:08 被阅读0次

    HDFS文件
    基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
    streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
    streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
    Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。

    基于HDFS的实时wordcount程序
    1、基于HDFS的实时wordcount程序

    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    /**
    * 基于HDFS文件的实时wordcount程序
    * @author Administrator
    *
    */
    public class HDFSWordCount {
    
    ​public static void main(String[] args) {
    ​​SparkConf conf = new SparkConf()​​​​.setMaster("local[2]")​​​​.setAppName("HDFSWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    ​​// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
    ​​JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");
    ​​​​// 执行wordcount操作
    ​​JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
    ​​​private static final long serialVersionUID = 1L;
    
    ​​​@Override
    ​​​public Iterable<String> call(String line) throws Exception {
    ​​​​return Arrays.asList(line.split(" "));
    ​​​}
    ​​});
    JavaPairDStream<String, Integer> pairs = words.mapToPair(
    new PairFunction<String, String, Integer>() {
    ​​​​​private static final long serialVersionUID = 1L;
    
    ​​​​​@Override
    ​​​​​public Tuple2<String, Integer> call(String word) ​​​​​​​throws Exception {
    ​​​​​​return new Tuple2<String, Integer>(word, 1);
    ​​​​​}
    ​​​​});
    ​​JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
    
    ​​​​new Function2<Integer, Integer, Integer>() {
    
    private static final long serialVersionUID = 1L;
    
    @Override
    ​​​​​public Integer call(Integer v1, Integer v2) throws Exception {
    ​​​​​​return v1 + v2;
    ​​​​​}​​​​
    ​​​​});
    wordCounts.print();
    ​​jssc.start();
    ​​jssc.awaitTermination();
    ​​jssc.close();
    ​}
    }
    

    验证:
    Hadoop fs –mkdir /wordCount_dir

    相关文章

      网友评论

          本文标题:输入DStream之基础数据源

          本文链接:https://www.haomeiwen.com/subject/rqppyqtx.html