本文首先介绍一些Spark的Java 函数API,然后利用一个Java版本的Spark Streaming WordCount程序演示这些API,以及UpdatestateBykey算子。
参考官方文档:
Java Programming Guide - Spark 0.8.0 Documentation
Class | Function Type |
---|---|
Function<T, R> | T => R |
DoubleFunction<T> | T => Double |
PairFunction<T, K, V> | T => Tuple2<K, V> |
FlatMapFunction<T, R> | T => Iterable<R> |
DoubleFlatMapFunction<T> | T => Iterable<Double> |
PairFlatMapFunction<T, K, V> | T => Iterable<Tuple2<K, V>> |
Function2<T1, T2, R> | T1, T2 => R (function of two arguments) |
以上每个类都有一个抽象的类方法call(),实现类必须实现该方法。
一、Function<T, R>
该类表示将T转换为R。
JavaRDD<String> map = logData.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return s + " add";
}
});
上述代码的将logData中的每个字符串转换为一个新的字符串,方法是在原来的字符串上,增加一个固定字符串" add"。
二、DoubleFunction<T>
将T转换为Double。也可以用Function<T, Double>代替。
DoubleFunction<String> doubleFunction = new DoubleFunction<String>() {
@Override
public double call(String s) throws Exception {
return Double.valueOf(s).doubleValue();
}
};
三、PairFunction<T, K, V>
将T转换为Tuple2<K, V>,将一个元素转换为一个元组。
JavaPairRDD<String, Integer> javaPairRDD = logData.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
以上代码将一个字符串RDD转换一个元组RDD。
四、Function2<T1, T2, R>
T1, T2 => R,方法包含两个参数,T1,T2为输入参数,R为返回值。
Function2<Long, Long, Long> function2 = new Function2<Long, Long, Long>() {
@Override
public Long call(Long t1, Long t2) throws Exception {
return t1 + t2;
}
};
以上为将两个参数求和返回的实现。
五、使用UpdatestateBykey算子演示。
Spark Streaming---UpdatestateBykey(java)
package com.spark.streaming;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.google.common.base.Optional;
import scala.Tuple2;
public class UpdateStateByKeyWordcount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf , Durations.seconds(5));
jssc.checkpoint(".");
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("node15", 8888);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordcounts = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
private static final long serialVersionUID = 1L;
// 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数,第一个参数values相当于这个batch中
// 这个key对应的新的一组值,可能有多个,可能2个1,(xuruyun,1)(xuruyun,1),那么这个values就是(1,1)
// 那么第二个参数表示的是这个key之前的状态,我们看类型Integer你也就知道了,这里是泛型咱们自己指定的
@Override
public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) throws Exception {
// Optional其实有两个子类,一个子类是Some,一个子类是None
// 就是key有可能之前从来都没有出现过,意味着之前从来没有更新过状态
Integer newValue = 0;
if(state.isPresent()){
newValue = state.get();
}
for(Integer value : values){
newValue += value;
}
return Optional.of(newValue);
}
});
wordcounts.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
jssc.close();
}
}
https://blog.csdn.net/liaomingwu/article/details/122507550
https://blog.csdn.net/ymf827311945/article/details/77411137
网友评论