美文网首页
【Spark】Java API中的一些Function的说明

【Spark】Java API中的一些Function的说明

作者: 小北觅 | 来源:发表于2022-06-08 23:01 被阅读0次

本文首先介绍一些Spark的Java 函数API,然后利用一个Java版本的Spark Streaming WordCount程序演示这些API,以及UpdatestateBykey算子。

参考官方文档:

Java Programming Guide - Spark 0.8.0 Documentation

Class Function Type
Function<T, R> T => R
DoubleFunction<T> T => Double
PairFunction<T, K, V> T => Tuple2<K, V>
FlatMapFunction<T, R> T => Iterable<R>
DoubleFlatMapFunction<T> T => Iterable<Double>
PairFlatMapFunction<T, K, V> T => Iterable<Tuple2<K, V>>
Function2<T1, T2, R> T1, T2 => R (function of two arguments)

以上每个类都有一个抽象的类方法call(),实现类必须实现该方法。

一、Function<T, R>

该类表示将T转换为R。

JavaRDD<String> map = logData.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + " add";
            }
    });

上述代码的将logData中的每个字符串转换为一个新的字符串,方法是在原来的字符串上,增加一个固定字符串" add"。

二、DoubleFunction<T>

将T转换为Double。也可以用Function<T, Double>代替。

DoubleFunction<String> doubleFunction = new DoubleFunction<String>() {
            @Override
            public double call(String s) throws Exception {
                return Double.valueOf(s).doubleValue();
            }
        };

三、PairFunction<T, K, V>

将T转换为Tuple2<K, V>,将一个元素转换为一个元组。

JavaPairRDD<String, Integer> javaPairRDD = logData.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

以上代码将一个字符串RDD转换一个元组RDD。

四、Function2<T1, T2, R>

T1, T2 => R,方法包含两个参数,T1,T2为输入参数,R为返回值。

Function2<Long, Long, Long> function2 = new Function2<Long, Long, Long>() {
            @Override
            public Long call(Long t1, Long t2) throws Exception {
                return t1 + t2;
            }
        };

以上为将两个参数求和返回的实现。

五、使用UpdatestateBykey算子演示。

Spark Streaming---UpdatestateBykey(java)

package com.spark.streaming;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;

public class UpdateStateByKeyWordcount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(conf , Durations.seconds(5));
        jssc.checkpoint(".");

        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("node15", 8888);
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        JavaPairDStream<String, Integer> wordcounts = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

            private static final long serialVersionUID = 1L;

            // 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数,第一个参数values相当于这个batch中
            // 这个key对应的新的一组值,可能有多个,可能2个1,(xuruyun,1)(xuruyun,1),那么这个values就是(1,1)
            // 那么第二个参数表示的是这个key之前的状态,我们看类型Integer你也就知道了,这里是泛型咱们自己指定的

            @Override
            public Optional<Integer> call(List<Integer> values,
                    Optional<Integer> state) throws Exception {

                // Optional其实有两个子类,一个子类是Some,一个子类是None
                // 就是key有可能之前从来都没有出现过,意味着之前从来没有更新过状态
                Integer newValue = 0;
                if(state.isPresent()){
                    newValue = state.get();
                }
                for(Integer value : values){
                    newValue += value;
                }

                return Optional.of(newValue);
            }
        });

        wordcounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.stop();
        jssc.close();
    }
}

https://blog.csdn.net/liaomingwu/article/details/122507550
https://blog.csdn.net/ymf827311945/article/details/77411137

相关文章

网友评论

      本文标题:【Spark】Java API中的一些Function的说明

      本文链接:https://www.haomeiwen.com/subject/eyepmrtx.html