I have an event source that generates events that belong to certain groups. I would like to buffer these groups and send the groups (in batches) to storage. So far I have this:
我有一个事件源,可以生成属于某些组的事件。我想缓冲这些组并将组(分批)发送到存储。到目前为止我有这个:
eventSource
.GroupBy(event => event.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Subscribe(group => group.Events
.Buffer(TimeSpan.FromSeconds(60), 100)
.Subscribe(list => SendToStorage(list)));
So there's a nested subscribe to the events in a group. Somehow I think there's a better way but I haven't been able to figure it out yet.
所以有一个嵌套订阅组中的事件。不知怎的,我觉得有更好的方法,但我还没弄清楚。
2 个解决方案
#1
3
Here's the solution:
这是解决方案:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
Here's a couple general rules that can help you 'reduce':
这里有一些可以帮助你'减少'的一般规则:
1) A nested subscription is normally fixed with Select
ing everything before the nested subscription followed by a Merge
, then followed by the nested subscription. So applying that, you get this:
1)嵌套订阅通常通过在嵌套订阅之前选择所有内容然后是合并,然后是嵌套订阅来修复。所以应用它,你得到这个:
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
.Merge()
.Subscribe(list => SendToStorage(list));
2) You can obviously combine two consecutive selects (and since you're not doing anything with the anonymous object, can just remove that):
2)你显然可以组合两个连续的选择(因为你没有对匿名对象做任何事情,可以删除它):
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Merge()
.Subscribe(list => SendToStorage(list));
3) Finally, a Select
followed by a Merge
can be reduced to a SelectMany
:
3)最后,Select和后面的Merge可以简化为SelectMany:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
#2
1
Here is one way to do it
这是一种方法
(from g in eventSource.GroupByUntil(e => e.GroupingKey,
g => g.Buffer(TimeSpan.FromSeconds(60), 100))
from b in g.ToList()
select b).Subscribe(SendToStorage);
#1
3
Here's the solution:
这是解决方案:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
Here's a couple general rules that can help you 'reduce':
这里有一些可以帮助你'减少'的一般规则:
1) A nested subscription is normally fixed with Select
ing everything before the nested subscription followed by a Merge
, then followed by the nested subscription. So applying that, you get this:
1)嵌套订阅通常通过在嵌套订阅之前选择所有内容然后是合并,然后是嵌套订阅来修复。所以应用它,你得到这个:
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
.Merge()
.Subscribe(list => SendToStorage(list));
2) You can obviously combine two consecutive selects (and since you're not doing anything with the anonymous object, can just remove that):
2)你显然可以组合两个连续的选择(因为你没有对匿名对象做任何事情,可以删除它):
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Merge()
.Subscribe(list => SendToStorage(list));
3) Finally, a Select
followed by a Merge
can be reduced to a SelectMany
:
3)最后,Select和后面的Merge可以简化为SelectMany:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
#2
1
Here is one way to do it
这是一种方法
(from g in eventSource.GroupByUntil(e => e.GroupingKey,
g => g.Buffer(TimeSpan.FromSeconds(60), 100))
from b in g.ToList()
select b).Subscribe(SendToStorage);