It is possible to pull on demand from a number (say two for simplicity) of sources using streams (lazy lists). Iteratees can be used to process data coming from a single source.
可以使用流(懒惰列表)从一个数字(例如两个为简单)中提取需求。 Iteratees可用于处理来自单一来源的数据。
Is there an Iteratee-like functional concept for processing multiple input sources? I could imagine an Iteratee whose state signals from which source does it want to pull.
是否有类似Iteratee的功能概念来处理多个输入源?我可以想象一个Iteratee,它的状态信号来自它想要拉动的源。
4 个解决方案
#1
15
To do this using pipes you nest the Pipe monad transformer within itself, once for each producer you wish to interact with. For example:
要使用管道执行此操作,您可以将Pipe monad转换器嵌套在其自身内,对于您希望与之交互的每个生产者。例如:
import Control.Monad
import Control.Monad.Trans
import Control.Pipe
producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]
consumes2 :: (Show a, Show b) =>
Consumer a (Consumer b IO) r
consumes2 = forever $ do
a <- await -- await from outer producer
b <- lift await -- await from inner producer
lift $ lift $ print (a, b)
Just like a Haskell curried function of multiple variables, you partially apply it to each source using composition and runPipe:
就像多个变量的Haskell curried函数一样,您可以使用composition和runPipe将其部分应用于每个源:
consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA
fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB
The above function outputs when run:
上述功能在运行时输出:
>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)
This trick works for yielding or awaiting to any number of pipes upstream or downstream. It also works for proxies, the bidirectional analogs to pipes.
这个技巧适用于屈服或等待任何数量的上游或下游管道。它也适用于代理,管道的双向模拟。
Edit: Note that this also works for any iteratee library, not just pipes
. In fact, John Milikin and Oleg were the original advocates for this approach and I just stole the idea from them.
编辑:请注意,这也适用于任何iteratee库,而不仅仅是管道。事实上,John Milikin和Oleg是这种方法的最初倡导者,我只是从他们那里偷走了这个想法。
#2
6
We're using Machines in Scala to pull in not just two, but an arbitrary amount of sources.
我们在Scala中使用机器不仅可以引入两个,而且可以使用任意数量的源。
Two examples of binary joins are provided by the library itself, on the Tee
module: mergeOuterJoin
and hashJoin
. Here is what the code for hashJoin
looks like (it assumes both streams are sorted):
库本身在Tee模块上提供了两个二进制连接示例:mergeOuterJoin和hashJoin。以下是hashJoin的代码(假设两个流都已排序):
/**
* A natural hash join according to keys of type `K`.
*/
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
a <- awaits(left[A])
mp <- build(m + (f(a) -> a))
} yield mp) orElse Return(m)
for {
m <- build(Map())
r <- (awaits(right[B]) flatMap (b => {
val k = g(b)
if (m contains k) emit(m(k) -> b) else Return(())
})) repeatedly
} yield r
}
This code builds up a Plan
which is "compiled" to a Machine
with the repeatedly
method. The type being built here is Tee[A, B, (A, B)]
which is a machine with two inputs. You request inputs on the left and right with awaits(left)
and awaits(right)
, and you output with emit
.
此代码构建了一个计划,该计划使用重复方法“编译”到计算机。这里构建的类型是Tee [A,B,(A,B)],它是一台带有两个输入的机器。您可以通过等待(左)和等待(右)请求左右输入,然后使用emit输出。
There is also a Haskell version of Machines.
还有一个Haskell版本的机器。
#3
3
Conduits (and, it can be built for Pipes, but that code hasn't been released yet) has a zip
primitive that takes two upstreams and combines them as a stream of tuples.
管道(并且,它可以为Pipes构建,但该代码尚未发布)具有一个zip原语,它采用两个上游并将它们组合为元组流。
#4
1
Check out the pipes library, where vertical concatenation might do what you want. For example,
检查管道库,垂直连接可能会执行您想要的操作。例如,
import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void
source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"
sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x
pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)
The sequencing operator (>>)
vertically concatenates the sources, yielding the output (on a runPipe
)
排序运算符(>>)垂直连接源,产生输出(在runPipe上)
's'
'a'
'y'
'w'
'h'
'a'
't'
#1
15
To do this using pipes you nest the Pipe monad transformer within itself, once for each producer you wish to interact with. For example:
要使用管道执行此操作,您可以将Pipe monad转换器嵌套在其自身内,对于您希望与之交互的每个生产者。例如:
import Control.Monad
import Control.Monad.Trans
import Control.Pipe
producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]
consumes2 :: (Show a, Show b) =>
Consumer a (Consumer b IO) r
consumes2 = forever $ do
a <- await -- await from outer producer
b <- lift await -- await from inner producer
lift $ lift $ print (a, b)
Just like a Haskell curried function of multiple variables, you partially apply it to each source using composition and runPipe:
就像多个变量的Haskell curried函数一样,您可以使用composition和runPipe将其部分应用于每个源:
consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA
fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB
The above function outputs when run:
上述功能在运行时输出:
>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)
This trick works for yielding or awaiting to any number of pipes upstream or downstream. It also works for proxies, the bidirectional analogs to pipes.
这个技巧适用于屈服或等待任何数量的上游或下游管道。它也适用于代理,管道的双向模拟。
Edit: Note that this also works for any iteratee library, not just pipes
. In fact, John Milikin and Oleg were the original advocates for this approach and I just stole the idea from them.
编辑:请注意,这也适用于任何iteratee库,而不仅仅是管道。事实上,John Milikin和Oleg是这种方法的最初倡导者,我只是从他们那里偷走了这个想法。
#2
6
We're using Machines in Scala to pull in not just two, but an arbitrary amount of sources.
我们在Scala中使用机器不仅可以引入两个,而且可以使用任意数量的源。
Two examples of binary joins are provided by the library itself, on the Tee
module: mergeOuterJoin
and hashJoin
. Here is what the code for hashJoin
looks like (it assumes both streams are sorted):
库本身在Tee模块上提供了两个二进制连接示例:mergeOuterJoin和hashJoin。以下是hashJoin的代码(假设两个流都已排序):
/**
* A natural hash join according to keys of type `K`.
*/
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
a <- awaits(left[A])
mp <- build(m + (f(a) -> a))
} yield mp) orElse Return(m)
for {
m <- build(Map())
r <- (awaits(right[B]) flatMap (b => {
val k = g(b)
if (m contains k) emit(m(k) -> b) else Return(())
})) repeatedly
} yield r
}
This code builds up a Plan
which is "compiled" to a Machine
with the repeatedly
method. The type being built here is Tee[A, B, (A, B)]
which is a machine with two inputs. You request inputs on the left and right with awaits(left)
and awaits(right)
, and you output with emit
.
此代码构建了一个计划,该计划使用重复方法“编译”到计算机。这里构建的类型是Tee [A,B,(A,B)],它是一台带有两个输入的机器。您可以通过等待(左)和等待(右)请求左右输入,然后使用emit输出。
There is also a Haskell version of Machines.
还有一个Haskell版本的机器。
#3
3
Conduits (and, it can be built for Pipes, but that code hasn't been released yet) has a zip
primitive that takes two upstreams and combines them as a stream of tuples.
管道(并且,它可以为Pipes构建,但该代码尚未发布)具有一个zip原语,它采用两个上游并将它们组合为元组流。
#4
1
Check out the pipes library, where vertical concatenation might do what you want. For example,
检查管道库,垂直连接可能会执行您想要的操作。例如,
import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void
source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"
sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x
pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)
The sequencing operator (>>)
vertically concatenates the sources, yielding the output (on a runPipe
)
排序运算符(>>)垂直连接源,产生输出(在runPipe上)
's'
'a'
'y'
'w'
'h'
'a'
't'