在spark中大量使用了隐式转换,例如mapWithState方法并不在DStream中,但是在官方的样例StatefulNetworkWordCount中却可以使用
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc).initialState(initialRDD))
这就是因为在DStream的伴生对象中有一个隐式转换
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
PairDStreamFunctions[K, V] = {
new PairDStreamFunctions[K, V](stream)
}
只要是KV类型的DStream才会转换为PairDStreamFunctions对象,而mapWithState方法就是在PairDStreamFunctions中定义的。
样例
下面用一个简单的例子说明一下这种隐式转换是如何使用的。
class PairDStreamFunctions{
def mapWithState(item : String) = println("Save " + item + " from PairDStreamFunctions")
}
class OtherDStreamFunctions{
def mapWithState(item : String) = println("Save " + item + " from OtherDStreamFunctions")
}
//定义类
class DStream[T: ClassTag]
//定义伴生对象
object DStream{
//对于所有KV类型的DStream转换为PairDStreamFunctions对象
implicit def toPairDStreamFunctions[K, V](s : DStream[(K, V)]) = new PairDStreamFunctions
//对于所有单值类型的DStream转换为OtherDStreamFunctions对象
implicit def toOtherDStreamFunctions[V](s : DStream[V]) = new OtherDStreamFunctions
}
object ImplicitDemo{
def main(args: Array[String]): Unit = {
val ds_kv = new DStream[(String, Int)]
//由于定义的DStream为KV类型
//所以会隐式转换为PairDStreamFunctions
//也就是说下面调用的是PairDStreamFunctions中的mapWithState方法
ds_kv.mapWithState("bread")
val ds = new DStream[String]
//由于定义的DStream为单值类型
//所以会隐式转换为OtherDStreamFunctions
//也就是说下面调用的是OtherDStreamFunctions中的mapWithState方法
ds.mapWithState("jam")
}
}
即使类DStream中没有定义任何的方法,DStream实例也可以使用mapWithState方法。样例的输出结果为
Save bread from PairDStreamFunctions
Save jam from OtherDStreamFunctions
总结
使用这种方式,可以根据泛型实现不同方法的调用。
网友评论