1. 定义类
class GroupKey extends Serializable {
private var key1:String = "";
private var key2:String = "";
private var key3:String = "";
private var count:Int = 0;
/**
* 构造函数
* @param key1
* @param key2
* @param key3
*/
def this(key1:String, key2:String, key3:String) = {
this();
this.key1 = key1;
this.key2 = key2;
this.key3 = key3;
}
/**
* 重写equals方法
* @param obj
* @return
*/
override def equals(obj: Any): Boolean = {
obj match {
case temp: GroupKey =>
return (this.key1 == temp.key1 &&
this.key2 == temp.key2 &&
this.key3 == temp.key3 &&
this.count == temp.count);
case _ => false;
}
}
/**
* 重写hashCode方法
* @return
*/
override def hashCode(): Int = {
val state = Seq(key1, key2, key3, count)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
/**
* 重写toString方法
* @return
*/
override def toString: String = {
return this.key1 + "_" + this.key2 + "_" + this.key3 +"_" + this.count;
}
}
/**
* 伴生对象
*/
object GroupKey {
def merge(groupKey1: GroupKey, groupKey2: GroupKey):GroupKey = {
groupKey1.count += groupKey2.count;
return groupKey1;
}
def fill(line:String):GroupKey = {
val words:Array[String] = line.split(",");
var result = new GroupKey();
result.key1 = words(0);
result.key2 = words(1);
result.key3 = words(2);
result.count = words(3) match {
case "" => 0
case _ => words(3).toInt
};
return result;
}
}
2. 统计
import org.apache.spark.{SparkConf, SparkContext}
object GroupSum {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf();
sparkConf
.setAppName("GroupSum")
.setMaster("local[*]");
val sparkContext = new SparkContext(sparkConf);
val rdd = sparkContext.textFile("F:\\LearnSpark\\src\\main\\resources\\test.txt");
rdd
.keyBy(line => {
val words:Array[String] = line.split(",");
new GroupKey(words(0), words(1), words(2));
})
.map(line => {
(line._1, GroupKey.fill(line._2))
})
.reduceByKey((groupKey1:GroupKey, groupKey2:GroupKey) => {
GroupKey.merge(groupKey1, groupKey2);
})
.foreach(temp => {
println("key:" + temp._1.toString() + " value:" + temp._2.toString())
})
}
}
- 测试数据
a1,b1,c1,1
a2,b2,c2,2
a3,b3,c3,3
a1,b1,c1,4
a1,b1,c2,5
a2,b2,c2,6
- 流程:
- keyBy
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
val cleanedF = sc.clean(f)
map(x => (cleanedF(x), x))
}
该方法利用map遍历数据,然后基于给定的函数cleanedF生成 K-V 对,K是给定函数产生的结果,V是原始数据。
key:a1_b1_c1_0 value:a1,b1,c1,1
key:a2_b2_c2_0 value:a2,b2,c2,2
key:a3_b3_c3_0 value:a3,b3,c3,3
key:a1_b1_c1_0 value:a1,b1,c1,4
key:a1_b1_c2_0 value:a1,b1,c2,5
key:a2_b2_c2_0 value:a2,b2,c2,6
- map
基于上述产生的 K-V 对,遍历填充 V 里面的值
key:a1_b1_c1_0 value:a1_b1_c1_1
key:a1_b1_c1_0 value:a1_b1_c1_4
key:a1_b1_c2_0 value:a1_b1_c2_5
key:a2_b2_c2_0 value:a2_b2_c2_6
key:a2_b2_c2_0 value:a2_b2_c2_2
key:a3_b3_c3_0 value:a3_b3_c3_3
- reduceByKey
对上述 K-V 对,同组聚合
key:a2_b2_c2_0 value:a2_b2_c2_8
key:a1_b1_c2_0 value:a1_b1_c2_5
key:a1_b1_c1_0 value:a1_b1_c1_5
key:a3_b3_c3_0 value:a3_b3_c3_3
- 注意一
在 keyBy 时,以对象作为key,正常情况下,不同对象就是不同的key。
要想正确运行,必须重写 equals() 和 hashcode() 方法,让不同的对象在字段相同时就认为相同,这样就可以根据字段的值作为key分组了。
重写 equals() 方法时,必须重写 hashcode() 方法。否则会出现对象相同,但 hashcode 不同的情况,这违反了 hashcode 的原则。
hashcode 原则:
- 对象相同,hashcode相同
- 对象不同,hashcode可能相同
- hashcode相同,对象可能不同
- hashcode不同,对象不同
参考:https://blog.csdn.net/zhengchao1991/article/details/78916471
- 注意二
正常情况下,hashcode计算方法:
// Object.java
public native int hashCode();
String 的 hashcode 已经重写:
// String.java
public int hashCode() {
int h = hash;
if (h == 0 && value.length > 0) {
char val[] = value;
for (int i = 0; i < value.length; i++) {
h = 31 * h + val[i];
}
hash = h;
}
return h;
}
上面的GroupKey类中属性字段是String和int,其已经被重写过hashCode方法,所以用字段的hashcode计算类的hashcode不会出错。
如果属性字段中有对象,那么这些对象也必须要重写hashCode方法以保证每次计算出的hashcode正确。
网友评论