停止所有后续调用以在响应式扩展中进行订阅

时间:2020-11-28 08:07:51

I am replacing some of my previously written code with reactive extensions.

我正在用反应式扩展替换我之前编写的一些代码。

Previously I was using File.ReadAllLines(FileName) and then loop through all the lines of files and in this loop at some point, I break from the loop and perform some operation on the processed records(which now I want to do in the OnCompleted action of Subscribe method).

以前我使用的是File.ReadAllLines(FileName),然后循环遍历所有文件行,在这个循环中,我在循环中断开并对已处理的记录执行一些操作(现在我想在OnCompleted中执行订阅方法的行动)。

Now I am trying to use this

现在我想尝试使用它

var observableSequence = File.ReadAllLines(FileName).ToObservable();
observableSequence.Subscribe(u=>
{
//doing some thing but at some point I  need to stop receving further calls and go for the OnCompleted action.
});

I understand there is a cheap hack of If condition which wont satisfied after that condition but still I don't want to waste my CPU cycles over processing unnecessary events.

我知道有一个廉价的If条件,在那个条件之后不满意,但我仍然不想浪费我的CPU周期来处理不必要的事件。

The only way I know to stop doing this is by Disposing the subscription but at this point while I am in between subscribing this wont be possible.

我知道停止这样做的唯一方法是通过处理订阅,但此时我在订阅之间这是不可能的。

So is there any way I could ignore all the events at some point and jump straight to OnCompleted.

那么有什么办法可以在某些时候忽略所有事件并直接跳到OnCompleted。

1 个解决方案

#1


Since you want some sort of manual way to stop the observable, you could try this:

由于您需要某种手动方式来停止observable,您可以尝试这样做:

var signal = new Subject<Unit>();
var observableSequence =
    File
        .ReadAllLines(FileName)
        .ToObservable(Scheduler.Default)
        .TakeUntil(signal);
var count = 0;
observableSequence.Subscribe(u=>
{
    if (++count == 100) 
    {
        signal.OnNext(Unit.Default);
    }
}, () => Console.WriteLine(count));

Console.WriteLine(count);

It would be preferable to do it as part of the query though. Something like this:

但最好将其作为查询的一部分。像这样的东西:

var observableSequence =
    File
        .ReadAllLines(FileName)
        .ToObservable(Scheduler.Default)
        .TakeWhile((x, n) => n < 10);
var count = 0;
observableSequence.Subscribe(u => ++count, () => Console.WriteLine(count));

Console.WriteLine(count);

#1


Since you want some sort of manual way to stop the observable, you could try this:

由于您需要某种手动方式来停止observable,您可以尝试这样做:

var signal = new Subject<Unit>();
var observableSequence =
    File
        .ReadAllLines(FileName)
        .ToObservable(Scheduler.Default)
        .TakeUntil(signal);
var count = 0;
observableSequence.Subscribe(u=>
{
    if (++count == 100) 
    {
        signal.OnNext(Unit.Default);
    }
}, () => Console.WriteLine(count));

Console.WriteLine(count);

It would be preferable to do it as part of the query though. Something like this:

但最好将其作为查询的一部分。像这样的东西:

var observableSequence =
    File
        .ReadAllLines(FileName)
        .ToObservable(Scheduler.Default)
        .TakeWhile((x, n) => n < 10);
var count = 0;
observableSequence.Subscribe(u => ++count, () => Console.WriteLine(count));

Console.WriteLine(count);