Coursera - Parallel Programming
<p>
Week3 - Data Parallelism
<p>
Homework
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
trait GenIterable[A]
trait Iterable[A] extends GenIterable[A]
trait ParIterable[A] extends GenIterable[A]
trait Iterator[A] {
def next(): A
def hasNext: Boolean
def foldLeft[B](z: B)(f: (B, A) => B): B = {
var s = z
while (hasNext) s = f(s, next())
s
}
}
trait Splitter[A] extends Iterator[A] {
def split: Seq[Splitter[A]]
def remaining: Int
def fold(z: A)(f: (A, A) => A): A = {
if (remaining < 1000) foldLeft(z)(f)
else {
val children = for {
child <- split
} yield Future {
child.fold(z)(f)
}
Await.result(Future.sequence(children), Duration.Inf).foldLeft(z)(f)
}
}
}
trait Builder[A, Repr] {
def +=(elem: A): Builder[A, Repr]
def result: Repr
}
trait Combiner[A, Repr] extends Builder[A, Repr] {
def combine(that: Combiner[A, Repr]): Combiner[A, Repr]
}
trait SplittableLike[A, Repr] extends IterableLike[A, Repr] {
def splitter: Splitter[A] // on every parallel collection
}
trait IterableLike[A, Repr <: Iterable[A]] extends Buildable[A, Repr] {
def iterator: Iterator[A] // on every collection
def foldLeft[B](z: B)(f: (B, A) => B): B = {
iterator.foldLeft(z)(f)
}
def filter(p: A => Boolean): Repr = {
val b = newBuilder
for (x <- this) if (p(x)) b += x
b.result
}
}
trait ParIterableLike[A, Repr <: ParIterable[A]] extends SplittableLike[A, Repr]
with Combinable[A, Repr] {
override def filter(p: A => Boolean): Repr = {
def helper(p: A => Boolean)(sp: Splitter[A]): Combiner[A, Repr] = {
if (sp.remaining < 1000) {
val c = newCombiner
while (sp.hasNext) {
val n = sp.next()
if (p(n)) c += n
}
c
} else {
val children = for {
child <- sp.split
} yield Future {
helper(p)(child)
}
Await.result(Future.sequence(children), Duration.Inf).reduceLeft(_ combine _)
}
}
helper(p)(splitter).result
}
}
trait Buildable[A, Repr] {
def newBuilder: Builder[A, Repr] // on every collection
}
trait Combinable[A, Repr] extends Buildable[A, Repr] {
def newCombiner: Combiner[A, Repr] // on every parallel collection
}
trait MySeqLike[T, Repr <: MySeq[T]] extends IterableLike[T, Repr] {
}
trait MyParSeqLike[T, Repr <: MyParSeq[T]] extends ParIterableLike[T, Repr] {
def fold(z: T)(f: (T, T) => T): T = {
splitter.fold(z)(f)
}
}
class MySeq[T] extends MySeqLike[T, Seq[T]] {
override def iterator: Iterator[T] = ???
override def newBuilder: Builder[T, Seq[T]] = ???
}
class MyParSeq[T] extends MyParSeqLike[T, MyParSeq[T]] {
override def splitter: Splitter[T] = ???
override def iterator: Iterator[T] = ???
override def newCombiner: Combiner[T, MyParSeq[T]] = ???
override def newBuilder: Builder[T, MyParSeq[T]] = ???
}
网友评论