泛函编程(22)-泛函数据类型-Monoid In Action

时间:2023-01-16 05:38:22

在上一节我们讨论了Monoid的结合性和恒等值的作用以及Monoid如何与串类元素折叠算法相匹配。不过我们只示范了一下基础类型(primitive type)Monoid实例的应用,所以上一节的讨论目的是理论多于实践。在这一节我们将把重点放在一些实用综合类型(composite type)Monoid实例及Monoid的抽象表达及函数组合能力。

Monoid的二元操作函数具有结合特性(associativity),与恒等值(identity)共同应用可以任意采用左折叠或右折叠算法处理串类元素(List element)而得到同等结果。所以使用Monoid op我们可以得出左折叠等于右折叠的结论:

左折叠:op(op(op(a,b),c),d)

右折叠:op(a,op(b,op(c,d)))

但是,如果能够用并行算法的话就是:

并行算法:op(op(a,b),op(c,d)) 我们可以同时运算 op(a,b), op(c,d)

如果我们可以使用并行算法的话,肯定能提升计算效率。试想如果我们对一个超大文件进行文字数统计或者寻找最大值什么的,我们可以把这个大文件分成若干小文件然后同时计算后再合计将节省很多计算时间。在现代大数据时代,数据文件不但大而且是分布在许多服务器上的。那么Monoid的特殊定律就可以使数据处理并行运算,特别匹配大数据处理模式。

我们看看能不能实现像折叠算法似的并行算法:

   def foldMapV[A,B](iseq: IndexedSeq[A])(m: Monoid[B])(f: A => B): B = {
if (iseq.isEmpty) m.zero
else if (iseq.length == 1) f(iseq(0))
else {
val (l, r) = iseq.splitAt(iseq.length / 2)
m.op(foldMapV(l)(m)(f), foldMapV(r)(m)(f))
}
} //> foldMapV: [A, B](iseq: IndexedSeq[A])(m: ch10.ex2.Monoid[B])(f: A => B)B

啊,这个foldMapV从外表,在类型款式上跟foldMap相同,不同的是内部具体实现方式;foldMapV循环把目标串进行分半。

结合前面对并行运算的讨论,我们用以下方式应该可以实现并行运算吧:

   def foldMapV[A,B](iseq: IndexedSeq[A])(m: Monoid[B])(f: A => B): B = {
if (iseq.isEmpty) m.zero
else if (iseq.length == 1) f(iseq(0))
else {
val (l, r) = iseq.splitAt(iseq.length / 2)
m.op(async(foldMapV(l)(m)(f)), async(foldMapV(r)(m)(f)))
}
} //> foldMapV: [A, B](iseq: IndexedSeq[A])(m: ch10.ex2.Monoid[B])(f: A => B)B

好,我们下面找个例子来示范高阶类型Monoid实例和并行运算应用:用Monoid来实现对字串(List[String])的文字数统计。由于我们打算采用并行计算,对字串进行分半时会可能会出现一字分成两半的情况,所以需要自定义复杂一点的数据类型:

   trait WC
case class Stub(chars: String) extends WC //chars 记录了未完整文字的字符
case class Part(lStub: String, words: Int, rStub: String) extends WC //lStub=左边文字结尾, words=完整字数,rStub=右边文字开头 def wcMonoid: Monoid[WC] = new Monoid[WC] {
def op(wc1: WC, wc2: WC): WC = {
(wc1, wc2) match {
case (Stub(l),Stub(r)) => Stub(l + r)
case (Stub(lb),Part(le,w,rb)) => Part(lb+le,w,rb)
case (Part(le,w,rb),Stub(re)) => Part(le,w,rb+re)
case (Part(le,w,rb),Part(lb,w2,re)) => Part(le, w + (if((rb+lb).isEmpty) 0 else 1) + w2, re)
}
}
val zero = Stub("")
}

Monoid[WC]是个WC类型的Monoid实例,op(wc1,wc2)=wc3则把两个WC值拼凑起来变成另一个WC值。下面让我们用wcMonoid来实现这个文字统计函数:

   def wordCount(ws: String): Int = {
def wc(c: Char): WC = {
if (c.isWhitespace) Part("",0,"")
else Stub(c.toString)
}
def unstub(s: String) = s.length min 1 foldMapV(ws.toIndexedSeq)(wcMonoid)(wc) match {
case Stub(c) => 0
case Part(l,w,r) => unstub(l) + w + unstub(r)
}
} //> wordCount: (ws: String)Int

用它来数个字数:

   val words = "the brown fox     is running quickly."  //故意留点空格,标点符号什么的
//> words : String = the brown fox is running quickly.
wordCount(words) //> res0: Int = 6 

没错!

再来一个例子:检查一串数字是否是有序的:

   def seqIsOrdered(iseq: IndexedSeq[Int]): Boolean = {
val stateMonoid = new Monoid[Option[(Int, Int, Boolean)]] { //状态:Option[(最小值,最大值,是排序)]
def op(s1: Option[(Int,Int,Boolean)], s2: Option[(Int,Int,Boolean)]): Option[(Int,Int,Boolean)] = {
(s1,s2) match {
case (None, b) => b
case (b, None) => b
case (Some((x1,y1,b1)),Some((x2,y2,b2))) => Some(x1 min x2,y1 max y2, b1 && b2 && x2 >= y1)
}
}
val zero = None
}
foldMapV(iseq)(stateMonoid)(i => Some(i,i,true)) map (_._3) getOrElse true
}
   seqIsOrdered(List(1,2,5,9,33).toIndexedSeq)     //> res0: Boolean = true
seqIsOrdered(List(1,2,5,0 ,33).toIndexedSeq) //> res1: Boolean = false

在这个例子里我们用Option[min,max,ordered]作为当前状态并用stateMonoid来处理这个状态。foldMapV参数(i => Some(i,i,true))就是标准的 A => B。还记得吗,我们增加foldMap这个函数是的目的是如果元素A没有Monoid实例,那么我们可以用Monoid[B]然后用A =>B函数把A转成B才能使用Monoid[B]。这里我们把 i转成Some(Int,Int,Boolean)。

值得注意的是以上两个例子foldMapV历遍无论如何是不会中途退出的。这个特点把foldMapV的使用局限在必须消耗整个数据源的计算应用,如求和、最大值等等。对于另外一些要求,如:A => Boolean这种要求,即使第一个值就已经得到答案也必须走完整串数据。

我们在之前的章节里曾经讨论了一些数据结构如List,Stream,Tree等。当我们需要处理这些结构中封装的元素时通常使用一些算法如折叠算法。这种算法能保存数据结构。而且它们有共通性:都可以使用折叠算法。既然有共性,肯定就会有深度抽象的空间,我们可以把它们抽象表达成一个Foldable[F[_]]:List,Stream,Tree等数据结构类型就是F[_];一个数据结构中封装了一些元素。这个Foldable类型可以包含各种历遍算法:

  def endoComposeMonoid[A] = new Monoid[A => A] {
def op(f: A => A, g: A => A): A => A = f compose g
val zero = (a: A) => a
}
def dual[A](m: Monoid[A]): Monoid[A] = new Monoid[A] {
def op(a1: A, a2: A) = m.op(a2,a1)
val zero = m.zero
}
trait Foldable[F[_]] {
def foldRight[A, B](as: F[A])(z: B)(f: (A, B) => B): B = {
foldMap(as)(endoComposeMonoid[B])(a => b => f(a,b))(z)
}
def foldLeft[A, B](as: F[A])(z: B)(f: (B, A) => B): B = {
foldMap(as)(dual(endoComposeMonoid[B]))(a => b => f(b,a))(z)
}
def foldMap[A, B](as: F[A])(mb: Monoid[B])(f: A => B): B = {
foldLeft(as)(mb.zero)((b,a) => mb.op(f(a),b))
}
def concatenate[A](as: F[A])(m: Monoid[A]): A = {
foldLeft(as)(m.zero)(m.op)
}
}

我们现在已经得到了个Foldable抽象数据结构,它包含了多种折叠历遍算法。我们可以试着创建一些Foldable实例看看:

   object listFoldable extends Foldable[List] {
override def foldRight[A, B](as: List[A])(z: B)(f: (A, B) => B): B = {
as.foldRight(z)(f)
}
override def foldLeft[A, B](as: List[A])(z: B)(f: (B, A) => B): B = {
as.foldLeft(z)(f)
}
override def foldMap[A, B](as: List[A])(mb: Monoid[B])(f: A => B): B = {
as.foldLeft(mb.zero)((b,a) => mb.op(f(a),b))
}
override def concatenate[A](as: List[A])(m: Monoid[A]): A = {
as.foldLeft(m.zero)(m.op)
}
}
object indexedSeqFoldable extends Foldable[IndexedSeq] {
override def foldRight[A, B](as: IndexedSeq[A])(z: B)(f: (A, B) => B): B = {
as.foldRight(z)(f)
}
override def foldLeft[A, B](as: IndexedSeq[A])(z: B)(f: (B, A) => B): B = {
as.foldLeft(z)(f)
}
override def foldMap[A, B](as: IndexedSeq[A])(mb: Monoid[B])(f: A => B): B = {
as.foldLeft(mb.zero)((b,a) => mb.op(f(a),b))
}
override def concatenate[A](as: IndexedSeq[A])(m: Monoid[A]): A = {
as.foldLeft(m.zero)(m.op)
}
}
object StreamFoldable extends Foldable[Stream] {
override def foldRight[A, B](as: Stream[A])(z: B)(f: (A, B) => B): B = {
as.foldRight(z)(f)
}
override def foldLeft[A, B](as: Stream[A])(z: B)(f: (B, A) => B): B = {
as.foldLeft(z)(f)
}
}

再看看Tree foldable 实例:

  trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
object TreeFoldable extends Foldable[Tree] {
override def foldMap[A, B](as: Tree[A])(mb: Monoid[B])(f: A => B): B = {
as match {
case Leaf(a) => f(a)
case Branch(l,r) => mb.op(foldMap(l)(mb)(f),foldMap(r)(mb)(f))
}
}
override def foldRight[A, B](as: Tree[A])(z: B)(f: (A, B) => B): B = {
as match {
case Leaf(a) => f(a,z) //恒等值定律
case Branch(l,r) => foldRight(l)(foldRight(r)(z)(f))(f)
}
}
override def foldLeft[A, B](as: Tree[A])(z: B)(f: (B,A) => B): B = {
as match {
case Leaf(a) => f(z,b)
case Branch(l,r) => foldLeft(r)(foldLeft(l)(z)(f))(f)
}
}
}

可以看出TreeFoldable的实现方式与List,Stream等串类数据类型有所不同。这是因为Tree类型没有现成的折叠算法。再就是Tree类型没有空值(只有Leaf, Branch)。这个特性暗示着有些类型的Monoid是没有恒等值的。我们统称这些类型为semigroup。

Option的foldable与TreeFoldable很像:

   object OptionFoldable extends Foldable[Option] {
override def foldMap[A, B](opt: Option[A])(mb: Monoid[B])(f: A => B): B = {
opt match {
case None => mb.zero
case Some(a) => f(a)
}
}
override def foldRight[A, B](opt: Option[A])(z: B)(f: (A, B) => B): B = {
opt match {
case None => z
case Some(a) => f(a,z)
}
}
override def foldLeft[A, B](opt: Option[A])(z: B)(f: (B,A) => B): B = {
opt match {
case None => z
case Some(a) => f(z,a)
}
}
}

实际上任何可折叠的数据类型都可以被转换成List类型,因为我们可以用折叠算法重组List:

  trait Foldable[F[_]] {
def foldRight[A, B](as: F[A])(z: B)(f: (A, B) => B): B = {
foldMap(as)(endoComposeMonoid[B])(a => b => f(a,b))(z)
}
def foldLeft[A, B](as: F[A])(z: B)(f: (B, A) => B): B = {
foldMap(as)(dual(endoComposeMonoid[B]))(a => b => f(b,a))(z)
}
def foldMap[A, B](as: F[A])(mb: Monoid[B])(f: A => B): B = {
foldLeft(as)(mb.zero)((b,a) => mb.op(f(a),b))
}
def concatenate[A](as: F[A])(m: Monoid[A]): A = {
foldLeft(as)(m.zero)(m.op)
}
def toList[A](as: F[A]): List[A] = {
foldRight(as)(Nil: List[A]){_ :: _}
}
}

Monoid的函数组合能力也挺有趣:如果我们有Monoid[A], Monoid[B],那我们就可以把它们组合成Monoid[(A,B)]:

   def productMonoid[A,B](ma: Monoid[A], mb: Monoid[B]): Monoid[(A,B)] = new Monoid[(A,B)] {
def op(x: (A,B), y: (A,B)) = (ma.op(x._1, y._1), mb.op(x._2,y._2))
val zero = (ma.zero, mb.zero)
} //> productMonoid: [A, B](ma: ch10.ex2.Monoid[A], mb: ch10.ex2.Monoid[B])ch10.e
//| x2.Monoid[(A, B)]

我们可以用这个组合的Monoid在历遍一个List时同时计算List长度及和:

  val pm = productMonoid(intAdditionMonoid,intAdditionMonoid)
//> pm : ch10.ex2.Monoid[(Int, Int)] = ch10.ex2$$anonfun$main$1$$anon$6@614c55
//| 15
listFoldable.foldMap(List(1,2,3,4))(pm)(a => (1, a))
//> res0: (Int, Int) = (4,10)

在历遍过程中我们把List每个节点元素值转成一对值 a => (1, a),然后分别对每个成员实施intAdditionMonoid的op操作。

下面剩下的时间我们再讨论一些较复杂的Monoid:

如果一个函数的结果是Monoid,我们可以实现这个函数的Monoid实例:

   def functionMonoid[A,B](mb: Monoid[B]): Monoid[A => B] = new Monoid[A => B] {
def op(f: A => B, g: A => B): A => B = a => mb.op(f(a),g(a))
val zero: A => B = a => mb.zero
}

实现这个Monoid实例的应当尽量从类型匹配入手:函数A => B的结果是B;我们有Monoid[B],Monoid[B].op(b,b)=>b。

再来一个合并key-value Map的Monoid实例:如果我们有value类型的Monoid实例就可以实现:

   def mapMergeMonoid[K,V](mv: Monoid[V]): Monoid[Map[K,V]] = new Monoid[Map[K,V]] {
val zero = Map()
def op(ma: Map[K,V], mb: Map[K,V]): Map[K,V] = {
ma.map {
case (k,v) => (k, mv.op(v, mb get(k) getOrElse mv.zero ))
}
}
}

有了这个Monoid实例,我们就可以处理Map的嵌入表达式了:

   val M: Monoid[Map[String, Map[String, Int]]] = mapMergeMonoid(mapMergeMonoid(intAdditionMonoid))
M: Monoid[Map[String, Map[String, Int]]] = $anon$1@21dfac82 val m1 = Map("o1" -> Map("i1" -> 1, "i2" -> 2))
m1: Map[String,Map[String,Int]] = Map(o1 -> Map(i1 -> 1, i2 -> 2)) val m2 = Map("o1" -> Map("i2" -> 3))
m2: Map[String,Map[String,Int]] = Map(o1 -> Map(i2 -> 3)) val m3 = M.op(m1, m2)
m3: Map[String,Map[String,Int]] = Map(o1 -> Map(i1 -> 1, i2 -> 5))

最后,我们试着用mapMergeMonoid实例来实现frequencyMap:计算输入List里的文字发现次数。如果用一个例子来说明的话,看看下面这个一串文字转成key-value Map:

Vector("a rose", "is a", "rose is", "a rose") >>> Map(a -> 3, rose -> 3, is -> 2)

这不就是搜索引擎中的索引比重算法吗?我们用foldMapV和mapMergeMonoid可以并行运算整理索引,这算是Monoid的实际应用之一。

我们看看具体实现:

 def frequencyMap[A](as: IndexedSeq[A]): Map[A, Int] =
foldMapV(as, mapMergeMonoid[A, Int](intAddition))((a: A) => Map(a -> 1))
 frequencyMap(Vector("a rose", "is a", "rose is", "a rose"))
res0: Map[String,Int] = Map(a -> 3, rose -> 3, is -> 2)