美文网首页
Spark基于对象的统计

Spark基于对象的统计

作者: Jorvi | 来源:发表于2019-11-08 15:28 被阅读0次

1. 定义类

class GroupKey extends Serializable {
  private var key1:String = "";
  private var key2:String = "";
  private var key3:String = "";
  private var count:Int = 0;

  /**
   * 构造函数
   * @param key1
   * @param key2
   * @param key3
   */
  def this(key1:String, key2:String, key3:String) = {
    this();
    this.key1 = key1;
    this.key2 = key2;
    this.key3 = key3;
  }

  /**
   * 重写equals方法
   * @param obj
   * @return
   */
  override def equals(obj: Any): Boolean = {
    obj match {
      case temp: GroupKey =>
        return (this.key1 == temp.key1 &&
          this.key2 == temp.key2 &&
          this.key3 == temp.key3 &&
          this.count == temp.count);
      case _ => false;
    }
  }

  /**
   * 重写hashCode方法
   * @return
   */
  override def hashCode(): Int = {
    val state = Seq(key1, key2, key3, count)
    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
  }

  /**
   * 重写toString方法
   * @return
   */
  override def toString: String = {
    return this.key1 + "_" + this.key2 + "_" + this.key3 +"_" + this.count;
  }

}


/**
 * 伴生对象
 */
object GroupKey {

  def merge(groupKey1: GroupKey, groupKey2: GroupKey):GroupKey = {
    groupKey1.count += groupKey2.count;
    return groupKey1;
  }

  def fill(line:String):GroupKey = {
    val words:Array[String] = line.split(",");
    var result = new GroupKey();
    result.key1 = words(0);
    result.key2 = words(1);
    result.key3 = words(2);
    result.count = words(3) match {
      case "" => 0
      case _ => words(3).toInt
    };
    return result;
  }
}

2. 统计

import org.apache.spark.{SparkConf, SparkContext}

object GroupSum {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf();
    sparkConf
      .setAppName("GroupSum")
      .setMaster("local[*]");

    val sparkContext = new SparkContext(sparkConf);

    val rdd = sparkContext.textFile("F:\\LearnSpark\\src\\main\\resources\\test.txt");

    rdd
      .keyBy(line => {
        val words:Array[String] = line.split(",");
        new GroupKey(words(0), words(1), words(2));
      })
      .map(line => {
        (line._1, GroupKey.fill(line._2))
      })
      .reduceByKey((groupKey1:GroupKey, groupKey2:GroupKey) => {
        GroupKey.merge(groupKey1, groupKey2);
      })
      .foreach(temp => {
        println("key:" + temp._1.toString() + " value:" + temp._2.toString())
      })
  }
}
  • 测试数据
a1,b1,c1,1
a2,b2,c2,2
a3,b3,c3,3
a1,b1,c1,4
a1,b1,c2,5
a2,b2,c2,6
  • 流程:
  1. keyBy
/**
   * Creates tuples of the elements in this RDD by applying `f`.
   */
  def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
    val cleanedF = sc.clean(f)
    map(x => (cleanedF(x), x))
  }

该方法利用map遍历数据,然后基于给定的函数cleanedF生成 K-V 对,K是给定函数产生的结果,V是原始数据。

key:a1_b1_c1_0 value:a1,b1,c1,1
key:a2_b2_c2_0 value:a2,b2,c2,2
key:a3_b3_c3_0 value:a3,b3,c3,3
key:a1_b1_c1_0 value:a1,b1,c1,4
key:a1_b1_c2_0 value:a1,b1,c2,5
key:a2_b2_c2_0 value:a2,b2,c2,6
  1. map
    基于上述产生的 K-V 对,遍历填充 V 里面的值
key:a1_b1_c1_0 value:a1_b1_c1_1
key:a1_b1_c1_0 value:a1_b1_c1_4
key:a1_b1_c2_0 value:a1_b1_c2_5
key:a2_b2_c2_0 value:a2_b2_c2_6
key:a2_b2_c2_0 value:a2_b2_c2_2
key:a3_b3_c3_0 value:a3_b3_c3_3
  1. reduceByKey
    对上述 K-V 对,同组聚合
key:a2_b2_c2_0 value:a2_b2_c2_8
key:a1_b1_c2_0 value:a1_b1_c2_5
key:a1_b1_c1_0 value:a1_b1_c1_5
key:a3_b3_c3_0 value:a3_b3_c3_3
  • 注意一

在 keyBy 时,以对象作为key,正常情况下,不同对象就是不同的key。

要想正确运行,必须重写 equals() 和 hashcode() 方法,让不同的对象在字段相同时就认为相同,这样就可以根据字段的值作为key分组了。

重写 equals() 方法时,必须重写 hashcode() 方法。否则会出现对象相同,但 hashcode 不同的情况,这违反了 hashcode 的原则。

hashcode 原则:

  1. 对象相同,hashcode相同
  2. 对象不同,hashcode可能相同
  3. hashcode相同,对象可能不同
  4. hashcode不同,对象不同

参考:https://blog.csdn.net/zhengchao1991/article/details/78916471

  • 注意二

正常情况下,hashcode计算方法:

// Object.java

public native int hashCode();

String 的 hashcode 已经重写:

// String.java

public int hashCode() {
    int h = hash;
    if (h == 0 && value.length > 0) {
        char val[] = value;

        for (int i = 0; i < value.length; i++) {
            h = 31 * h + val[i];
        }
    hash = h;
    }
    return h;
}

上面的GroupKey类中属性字段是String和int,其已经被重写过hashCode方法,所以用字段的hashcode计算类的hashcode不会出错。

如果属性字段中有对象,那么这些对象也必须要重写hashCode方法以保证每次计算出的hashcode正确。

相关文章

网友评论

      本文标题:Spark基于对象的统计

      本文链接:https://www.haomeiwen.com/subject/xovvbctx.html