美文网首页
纯函数式的并行计算(1)

纯函数式的并行计算(1)

作者: 吐思圈 | 来源:发表于2018-03-11 17:11 被阅读0次

选择数据类型和函数
“创建并行计算”具体是指什么?我们可以从一个相对简单的例子入手——求一组整数的和。例如下面就是利用左折叠的方法计算求和:

  def sum(ints: Seq[Int]): Int =
    ints.foldLeft(0)(_ + _)

除了叠加算法, 还有一个分治的算法,代码如下:

  def sum1(ints: IndexedSeq[Int]): Int =
    if (ints.length < 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      sum1(l) + sum1(r)
    }

我们使用splitAt函数将序列一分为二,并各自递归求和,最后合并它们的结果。这种实现可以实现并行化,即对两部分的求和可以同时进行。
一种用于并行计算的数据类型
用于表示并行计算的任何数据类型都包含一个结果,这个结果是一个有意义的类型(这里是Int),且能够获取。为此我们设计一个这样的数据类型Par[A](Par是Paralle的简写),它就像一个装有结果的容器,并具备下面的方法:

trait Par[A] {

}

object Par {

  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: => A): Par[A] = ???

  //从并行计算中抽取结果
  def get[A](pa: Par[A]): A = ???
}

现在我们用自定义的数据类型更新求和算法:

  import Par._

  def sum2(ints: IndexedSeq[Int]): Int =
    if (ints.length <= 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      val pl = unit(sum2(l))
      val pr = unit(sum2(r))
      get(pl) + get(pr)
    }

现在我们面临一个选择, 是让unit在一个独立的逻辑线程中立即求值,还是等到get被调用的时候再求值。unit立即求值会导致程序无法并行计算,但是unit返回一个代表一步计算的Par[Int],那在调用get的时候无法避免产生副作用。如何才能避免unit和get的缺陷呢?
组合并行计算
我们可以不调用get函数, 那么函数的代码将如下:

  def sum3(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(sum3(l), sum3(r))(_ + _)
    }

练习 7.1
Par.map2是一个新的高阶函数,用于组合两个并行计算的结果。实现map2函数:

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = get(pa)
    val b = get(pb)
    unit(f(a, b))
  }

显性分流
目前的API没有明确的表明何时应该将计算从主线程中分流出去,换句话说程序员也不知道在哪儿会发生并行计算。如何让分流更加明确呢?我们引入另一个函数来做:

  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = ???

让我们来重写sum函数:

  def sum4(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(folk(sum3(l)), folk(sum3(r)))(_ + _)
    }

对于length <= 1情况我们并不需要folk到一个独立线程中计算。
现在回到unit是严格还是惰性的问题,有了folk,即便unit是严格也不会有什么损失。至于非严格版本我们叫它lazyUnit吧:

  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = ???
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))

到此,我们看出Par只是一个值的函数,表明并行计算。而实际执行并行计算的是get函数,所以我们将get函数改名为run函数,表明这个并行计算实际执行的地方。

  //从并行计算中抽取结果
  def run[A](pa: Par[A]): A = ???

确定表现形式
经过各种思考和选择之后,我们有了下面大致的API。

  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = ???
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  //从并行计算中抽取结果
  def run[A](pa: Par[A]): A = ???
  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = ???
  
  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = run(pa)
    val b = run(pb)
    unit(f(a, b))
  }

练习 7.2
在继续之前,我们尽可能实现API中的函数
如上代码
让我们根据run函数反推Par类型,让我们试着假设run可以访问一个ExecutorService,看能不能搞清Par的样子:

  def run[A](s: ExecutorService)(pa: Par[A]): A = ???

最简单的莫过于,Par[A]是ExecutorService => A,当然这也未免太简单了,为此Par[A],应该是ExecutorService => Future[A],而run直接返回Future:

  type Par[A] = ExecutorService => Future[A]
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)

完善API
既然有了Par的表现形式,不妨就简单直接点,基于Par的表现类型最简单的实现:

object Par {

  import java.util.concurrent.{ExecutorService, Future, Callable}
  
  type Par[A] = ExecutorService => Future[A]

  private case class UnitFuture[A](a: A) extends Future[A] {
    override def isCancelled: Boolean = false

    override def get(): A = a

    override def get(timeout: Long, unit: TimeUnit): A = a

    override def cancel(mayInterruptIfRunning: Boolean): Boolean = false

    override def isDone: Boolean = true
  }
  
  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = es => UnitFuture(a)
  
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  
  //从并行计算中抽取结果
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)
  
  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = es => {
    es.submit(new Callable[A] {
      override def call(): A = pa(es).get()
    })
  }

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = 
    es => {
      val af = pa(es)
      val bf = pb(es)
      UnitFuture(f(af.get(), bf.get()))
    }
  
}

练习7.3
改进Map2的实现,支持超时设置。

  def map2[A, B, C](pa: Par[A], pb: Par[B], 
                    timeout: Long, timeUnit: TimeUnit)(f: (A, B) => C): Par[C] =
    es => {
      val af = pa(es)
      val bf = pb(es)
      val a = af.get(timeout, timeUnit)
      val b = bf.get(timeout, timeUnit)
      UnitFuture(f(a, b))
    }

练习 7.4
使用lazyUnit写一个函数将另一个函数A => B转换为一个一步计算

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

练习 7.5
实现一个叫做sequence的函数。不能使用而外的基础函数,不能调用run。

  def sequence[A](li: List[Par[A]]): Par[List[A]] = {
    def loop(n: Int, res: Par[List[A]]): Par[List[A]] = n match {
      case m if m < 0 => res
      case _ => loop(n - 1, map2(li(n), res)(_ :: _))
    }
    loop(li.length - 1, unit(Nil))
  }

练习 7.6
实现parFilter,并过滤列表元素

  def parFilter[A](li: List[A])(f: A => Boolean): Par[List[A]] = {
    def loop(n: Int, res: List[Par[A]]): List[Par[A]] = n match {
      case m if m < 0 => res
      case _ => 
        if (f(li(n))) loop(n - 1, lazyUnit(li(n)) :: res)
        else loop(n - 1, res)
    }
    sequence(loop(li.length - 1, Nil))
  }

相关文章

  • 纯函数式的并行计算(1)

    选择数据类型和函数“创建并行计算”具体是指什么?我们可以从一个相对简单的例子入手——求一组整数的和。例如下面就是利...

  • 纯函数式的并行计算(2)

    打破法则:一个微妙的bug在上篇博文的实现中,实际上会有一个相当微妙的问题出现在大多数folk的实现上,当使用固定...

  • MATLAB分布式并行计算中函数的使用

    2018-05-05 MATLAB分布式并行计算中函数的使用 1、parfor 函数 Execute for-lo...

  • Python进阶笔记

    文|Seraph 函数式编程 1 纯函数式编程:不需要变量、没有副作用、测试简单2 Python不是纯函数式编程(...

  • 为什么要学习Haskell语言?

    1. Haskell是一种纯函数式语言 使用纯函数式语言更加符合数学和逻辑的表达形式 使用纯函数式编程有利于编程技...

  • Clojure学习笔记(三)——函数式编程

    函数式编程的理念 函数式编程使得代码的编写、阅读、测试和重用都更容易了。 纯函数 函数式程序构建于纯函数之上。纯函...

  • JS纯函数 柯里化函数 组合函数

    1.纯函数(Pure Function) 函数式编程中有一个非常重要的概念叫做纯函数,javascript符合函数...

  • 函数式编程(二)

    纯函数 函数式编程中的函数,指的就是纯函数,这也是整个函数式编程的核心纯函数:相同的输入永远会得到相同的输出,而且...

  • 函数式编程

    1 文章目标 为什么要学习函数式编程以及什么是函数式编程 函数式编程的特性(纯函数、柯里化、函数组合等) 函数式编...

  • 【第十五天】函数式与并行运算

    第七章 函数式编程 7.1 1.Python中的函数式 函数式编程强调了函数的纯粹性(purity)一个纯函数是没...

网友评论

      本文标题:纯函数式的并行计算(1)

      本文链接:https://www.haomeiwen.com/subject/mabufftx.html