之前有看过kafka源码,有很多implict声明的方法,当时看的一头雾水,今天趁着空闲,了解下scala 的隐式转换和柯理化相关语法知识.
隐式转换#
需要类中的一个方法,但是这个类没有提供这样的一个方法,所以我们需要隐式转换,转换成提供了这个方法的类,然后再调用这个方法
import java.io.File
import scala.io.Source
class RichFile(val file: File) {
def read = {
Source.fromFile(file.getPath).mkString
}
}
object Context {
implicit def file2RichFile(file: File) = new RichFile(file)
}
object Hello_Implicit_Conversions {
def main(args: Array[String]): Unit = {
import Context.file2RichFile
println(new File("/Users/mac/Downloads/Kafka.json").read)
}
}
整体流程看下图:

隐式参数与隐式值
两种用法搭配起来来达到一个效果,隐式参数表明这个参数是可以缺少的,也就是说在调用的时候这个参数可以不用出现,那么这个值由什么填充呢? 那就是用隐式的值了,以下的例子说明了这一点:
object Context_Implicits {
//隐式值
implicit val default: String = "Java"
}
object Param {
//函数中用implicit关键字 定义隐式参数
def print (context: String) (implicit language: String) {
println(language + ":" + context)
}
}
object Implicit_Parameters {
def main(args: Array[String]): Unit = {
//隐式参数正常是可以传值的,和普通函数传值一样 但是也可以不传值,因为有缺省值(默认配置)
Param.print("Spark")("Scala")
import Context_Implicits._
//隐式参数没有传值,编译器会在全局范围内搜索 有没有implicit String类型的隐式值 并传入
Param.print("Hadoop")
}
}
implicit class 隐式类
implicit class MyClass(x: Int)`
作用:
这里的作用主要是其主构造函数可以作为隐式转换的参数,相当于其主构造函数可以用来当做一个implicit的function,下面举例说明一下:
object Test extends App {
implicit class MyName(x: Int) {
println("im in cons")
val y = x
}
def say(x: MyName) = {
println(x.y)
}
say(5)
}
输出结果:
5
这里的MyName是一个隐式类,其主构造函数可以用作隐式转换,所以say需要一个MyName类型的参数,但是调用的时候给的是一个Int,这里就会调用MyName的主构造函数转换为一个MyName的对象,然后再println其y的值
柯理化 Currying#
柯里化指的是将一个接收多个参数的函数分解成多个接收单个参数的函数的一种技术。
比如说有这样一个普通的函数
def minus(x: Int, y: Int) = x - y`</pre>
柯理化后就变成以下形式,一个减法操作被分割为两部分
def minusCurrying(x: Int)(y: Int) = x - y
调用以上两个函数
minus(5, 3)
minusCurrying(5)(3)
隐式(IMPLICIT)参数
如果要指定参数列表中的某些参数为隐式(implicit),应该使用多参数列表。例如:
def execute(arg: Int)(implicit ec: ExecutionContext) = ???
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val query = lines
.map(value => {
val record = Utils.json2Map(value)
val vin = ConfigUtils.getAsString(record, "vin")
val collectTime = ConfigUtils.getAsLong(record, "collectTime")
val longitude = ConfigUtils.getAsDouble(record, "longitude")
val latitude = ConfigUtils.getAsDouble(record, "latitude")
Entity(vin, new java.sql.Timestamp(collectTime), longitude, latitude)
})
.withWatermark("collectTime", "1 minutes")
.groupByKey(r =>r.vin)
//.mapGroupsWithState[VehicleTrip,VehicleResult](timeoutConf = GroupStateTimeout.NoTimeout)( func = mapGroupsWithStateFunction) //和withWatermark 一致 不好用不知道怎么输出
.flatMapGroupsWithState(outputMode = OutputMode.Append,timeoutConf = GroupStateTimeout.NoTimeout)( (vin: String, entitys: Iterator[Entity], state: GroupState[VehicleTrip]) => flatMapGroupsWithStateFunction(vin, entitys, state)(assignConfigBC))
.writeStream
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime(batchDuration * 1000L))
.start
query.awaitTermination()
}
def mapGroupsWithStateFunction(vin: String, entitys: Iterator[Entity], state: GroupState[VehicleTrip]): VehicleResult = {
var vehicelTrip = state.getOption.getOrElse(null)
if (vehicelTrip == null) {
vehicelTrip = new VehicleTrip(vin,0L)
}
entitys.foreach(en =>{
vehicelTrip.count += 1L
})
state.update(vehicelTrip)
VehicleResult(vehicelTrip,Array.empty, Array.empty)
}
def flatMapGroupsWithStateFunction(vin: String, entitys: Iterator[Entity], state: GroupState[VehicleTrip])(broadcast: Broadcast[StructuredStreamingAssignConfig]): Iterator[VehicleResult] = {
var vehicelTrip = state.getOption.getOrElse(null)
if (vehicelTrip == null) {
vehicelTrip = new VehicleTrip(vin,0)
}
entitys.foreach(en =>{
vehicelTrip.count +=1L
})
state.update(vehicelTrip)
Iterator(VehicleResult(vehicelTrip,Array.empty, Array.empty))
}
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.spark.api.java.function;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability.Evolving;
import org.apache.spark.sql.streaming.GroupState;
@Experimental
@Evolving
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
Iterator<R> call(K var1, Iterator<V> var2, GroupState<S> var3) throws Exception;
}
网友评论