使用反应性扩展(RX),是否可以添加“暂停”命令?

时间:2021-07-01 20:17:08

I have a class which takes in a stream of events, and pushes out another stream of events.

我有一个类,它接收事件流,并推出另一个事件流。

All of the events use Reactive Extensions (RX). The incoming stream of events is pushed from an external source into an IObserver<T> using .OnNext, and the outgoing stream of events is pushed out using IObservable<T> and .Subscribe. I am using Subject<T> to manage this, behind the scenes.

所有事件都使用反应性扩展(RX)。使用. onnext将来自外部的事件流推入IObserver ,使用IObservable 和. subscribe将事件的传出流推出。我正在使用Subject 来管理这个,在幕后。

I am wondering what techniques there are in RX to pause the output temporarily. This would mean that incoming events would build up in an internal queue, and when they are unpaused, the events would flow out again.

我想知道RX中有哪些技术可以暂时暂停输出。这意味着传入事件将在内部队列中构建,当它们未暂停时,事件将再次流出。

3 个解决方案

#1


4  

Here is my solution using Buffer and Window operators:

下面是我使用缓冲区和窗口操作符的解决方案:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Window is opened at subscription and every time 'true' is received from pauser stream. It closes when pauser provides 'false' value.

窗口在订阅时打开,每次从pauser流接收“true”。当pauser提供“false”值时关闭。

Buffer does what it supposed to do, buffers values that are between 'false' and 'true' from pauser. Once Buffer receives 'true' it outputs IList of values that are instantly streamed all at once.

Buffer做它应该做的事情,它会缓存pauser中介于'false'和'true'之间的值。一旦缓冲区接收到“true”,它就会立即输出所有的值。

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ

DotNetFiddle链接:https://dotnetfiddle.net/vGU5dJ

#2


2  

Here's a reasonably simple Rx way to do what you want. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

这里有一种相当简单的Rx方法来做你想做的事情。我创建了一个扩展方法,叫做paable,它使用一个可观察的源和第二个可观察的布尔值来暂停或恢复可观察值。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

它可以这样使用:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Now, the only thing I couldn't quite work out what you mean by your "incoming stream of events is an IObserver<T>". Streams are IObservable<T>. Observers aren't streams. It sounds like you're not doing something right here. Can you add to your question and explain further please?

现在,我唯一搞不懂你所说的“传入事件流”是什么意思的是IObserver 。流是IObservable < T >。观察家不流。听起来你好像没在做什么。你能补充你的问题并进一步解释吗?

#3


1  

You can simulate pausing/unpausing with an Observable.

您可以使用可观察值模拟暂停/取消暂停。

Once your pauseObservable emits a 'paused' value, buffer events until pauseObservable emits an 'unpaused' value.

一旦暂停观察值发出“暂停”值,就会产生缓冲区事件,直到暂停观察值发出“未暂停”值。

Here's an example which uses BufferUntil implementation by Dave Sexton and Observable logic by Timothy Shields (from my own question a while back)

下面是一个例子,它使用了BufferUntil,由Dave Sexton执行,Timothy Shields(从我自己的问题)中可以观察到的逻辑

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Room for improvement : maybe merge the two subscriptions to source (pausedEvents and notPausedEvents) as one.

改进的空间:可能合并两个订阅源(暂停和不暂停)作为一个。

#1


4  

Here is my solution using Buffer and Window operators:

下面是我使用缓冲区和窗口操作符的解决方案:

public static IObservable<T> Pausable<T>(this IObservable<T> source, IObservable<bool> pauser)
{
    var queue = source.Buffer(pauser.Where(toPause => !toPause),
                              _ => pauser.Where(toPause => toPause))
                      .SelectMany(l => l.ToObservable());

    return source.Window(pauser.Where(toPause => toPause).StartWith(true), 
                         _ => pauser.Where(toPause => !toPause))
                 .Switch()
                 .Merge(queue);
}

Window is opened at subscription and every time 'true' is received from pauser stream. It closes when pauser provides 'false' value.

窗口在订阅时打开,每次从pauser流接收“true”。当pauser提供“false”值时关闭。

Buffer does what it supposed to do, buffers values that are between 'false' and 'true' from pauser. Once Buffer receives 'true' it outputs IList of values that are instantly streamed all at once.

Buffer做它应该做的事情,它会缓存pauser中介于'false'和'true'之间的值。一旦缓冲区接收到“true”,它就会立即输出所有的值。

DotNetFiddle link: https://dotnetfiddle.net/vGU5dJ

DotNetFiddle链接:https://dotnetfiddle.net/vGU5dJ

#2


2  

Here's a reasonably simple Rx way to do what you want. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

这里有一种相当简单的Rx方法来做你想做的事情。我创建了一个扩展方法,叫做paable,它使用一个可观察的源和第二个可观察的布尔值来暂停或恢复可观察值。

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

它可以这样使用:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

Now, the only thing I couldn't quite work out what you mean by your "incoming stream of events is an IObserver<T>". Streams are IObservable<T>. Observers aren't streams. It sounds like you're not doing something right here. Can you add to your question and explain further please?

现在,我唯一搞不懂你所说的“传入事件流”是什么意思的是IObserver 。流是IObservable < T >。观察家不流。听起来你好像没在做什么。你能补充你的问题并进一步解释吗?

#3


1  

You can simulate pausing/unpausing with an Observable.

您可以使用可观察值模拟暂停/取消暂停。

Once your pauseObservable emits a 'paused' value, buffer events until pauseObservable emits an 'unpaused' value.

一旦暂停观察值发出“暂停”值,就会产生缓冲区事件,直到暂停观察值发出“未暂停”值。

Here's an example which uses BufferUntil implementation by Dave Sexton and Observable logic by Timothy Shields (from my own question a while back)

下面是一个例子,它使用了BufferUntil,由Dave Sexton执行,Timothy Shields(从我自己的问题)中可以观察到的逻辑

        //Input events, hot observable
        var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => i.ToString())
            .Publish().RefCount();

       //Simulate pausing from Keyboard, not actually relevant within this answer
        var pauseObservable = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            k => KeyPressed += k, k => KeyPressed -= k)
            .Select(i => i.EventArgs.PressedKey)
            .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause
            .DistinctUntilChanged();

        //Let events through when not paused
        var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused})
            .Where(i => !i.paused) //not paused
            .Select(i => i.value)
            .Subscribe(Console.WriteLine);

        //When paused, buffer until not paused
        var pausedEvents = pauseObservable.Where(i => i)
            .Subscribe(_ =>
                source.BufferUntil(pauseObservable.Where(i => !i))
                    .Select(i => String.Join(Environment.NewLine, i))
                    .Subscribe(Console.WriteLine));

Room for improvement : maybe merge the two subscriptions to source (pausedEvents and notPausedEvents) as one.

改进的空间:可能合并两个订阅源(暂停和不暂停)作为一个。