美文网首页
Scala Parallelization

Scala Parallelization

作者: aihex | 来源:发表于2016-07-20 16:02 被阅读0次

    Coursera - Parallel Programming

    <p>

    Week3 - Data Parallelism

    <p>

    Homework
    import scala.concurrent.duration.Duration
    import scala.concurrent.{Await, Future}
    
    import scala.concurrent.ExecutionContext.Implicits.global
    
    
    trait GenIterable[A]
    
    trait Iterable[A] extends GenIterable[A]
    
    trait ParIterable[A] extends GenIterable[A]
    
    trait Iterator[A] {
      def next(): A
    
      def hasNext: Boolean
    
      def foldLeft[B](z: B)(f: (B, A) => B): B = {
        var s = z
        while (hasNext) s = f(s, next())
        s
      }
    }
    
    trait Splitter[A] extends Iterator[A] {
      def split: Seq[Splitter[A]]
    
      def remaining: Int
    
      def fold(z: A)(f: (A, A) => A): A = {
        if (remaining < 1000) foldLeft(z)(f)
        else {
          val children = for {
            child <- split
          } yield Future {
            child.fold(z)(f)
          }
          Await.result(Future.sequence(children), Duration.Inf).foldLeft(z)(f)
        }
      }
    }
    
    trait Builder[A, Repr] {
      def +=(elem: A): Builder[A, Repr]
    
      def result: Repr
    }
    
    trait Combiner[A, Repr] extends Builder[A, Repr] {
      def combine(that: Combiner[A, Repr]): Combiner[A, Repr]
    }
    
    trait SplittableLike[A, Repr] extends IterableLike[A, Repr] {
      def splitter: Splitter[A] // on every parallel collection
    }
    
    trait IterableLike[A, Repr <: Iterable[A]] extends Buildable[A, Repr] {
      def iterator: Iterator[A] // on every collection
    
      def foldLeft[B](z: B)(f: (B, A) => B): B = {
        iterator.foldLeft(z)(f)
      }
    
      def filter(p: A => Boolean): Repr = {
        val b = newBuilder
        for (x <- this) if (p(x)) b += x
        b.result
      }
    }
    
    trait ParIterableLike[A, Repr <: ParIterable[A]] extends SplittableLike[A, Repr]
      with Combinable[A, Repr] {
    
      override def filter(p: A => Boolean): Repr = {
    
        def helper(p: A => Boolean)(sp: Splitter[A]): Combiner[A, Repr] = {
          if (sp.remaining < 1000) {
            val c = newCombiner
            while (sp.hasNext) {
              val n = sp.next()
              if (p(n)) c += n
            }
            c
          } else {
            val children = for {
              child <- sp.split
            } yield Future {
              helper(p)(child)
            }
            Await.result(Future.sequence(children), Duration.Inf).reduceLeft(_ combine _)
          }
        }
        helper(p)(splitter).result
      }
    }
    
    trait Buildable[A, Repr] {
      def newBuilder: Builder[A, Repr] // on every collection
    }
    
    trait Combinable[A, Repr] extends Buildable[A, Repr] {
      def newCombiner: Combiner[A, Repr] // on every parallel collection
    }
    
    trait MySeqLike[T, Repr <: MySeq[T]] extends IterableLike[T, Repr] {
    
    }
    
    trait MyParSeqLike[T, Repr <: MyParSeq[T]] extends ParIterableLike[T, Repr] {
      def fold(z: T)(f: (T, T) => T): T = {
        splitter.fold(z)(f)
      }
    }
    
    class MySeq[T] extends MySeqLike[T, Seq[T]] {
      override def iterator: Iterator[T] = ???
    
      override def newBuilder: Builder[T, Seq[T]] = ???
    }
    
    class MyParSeq[T] extends MyParSeqLike[T, MyParSeq[T]] {
      override def splitter: Splitter[T] = ???
    
      override def iterator: Iterator[T] = ???
    
      override def newCombiner: Combiner[T, MyParSeq[T]] = ???
    
      override def newBuilder: Builder[T, MyParSeq[T]] = ???
    }
    

    相关文章

      网友评论

          本文标题:Scala Parallelization

          本文链接:https://www.haomeiwen.com/subject/slfujttx.html