如何确保线程正在等待事件?

时间:2021-12-13 21:02:40

I wonder how to make sure that a thread is awaiting for events. Let say I've a component that raises events:

我想知道如何确保一个线程正在等待事件。假设我有一个引发事件的组件:

public delegate void TestHandler(object sender, EventArgs eventArgs);
class Producer
{
    public event TestHandler Handler;
    public void InvokeHandler(EventArgs eventargs)
    {
        var handler = Handler;
        if (handler != null) handler(this, eventargs);
    }

    public Producer()
    {
        Task.Factory.StartNew(() => 
        {
            while (true)
            {
                Thread.Sleep( (new Random()).Next(0,100) );
                InvokeHandler(new EventArgs());
            }  });  } }

And a Listener:

和一个侦听器:

class Listener
{
    private readonly BlockingCollection<EventArgs> _blockingCollection;
    public Listener()
    {
        var producer = new Producer();
        _blockingCollection = new BlockingCollection<EventArgs>(10);
        producer.Handler += producer_Handler;
    }

    void producer_Handler(object sender, EventArgs eventArgs)
    {
        _blockingCollection.TryAdd(eventArgs); //no error handling for simplicity sake
    }

    internal void ProcessEvents()
    {
        while (true)
        {
            EventArgs eventArgs;
            try
            {
                if (_blockingCollection.TryTake(out eventArgs))
                    Console.WriteLine("Received event");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            } } } }

If I would start it as:

如果我这样开头:

class Program
    {
        static void Main(string[] args)
        {
            var listner = new Listener();
            listner.ProcessEvents();
        } }

I got expected behaviour (from time to time I'm getting the event through blocking collection.

我得到了预期的行为(我不时通过阻塞集合获得事件)。

However if I would wrap this call in Task:

但是,如果我把这个调用放在任务中:

Task.Factory.StartNew(() => {
     var listner = new Listener();
     listner.ProcessEvents(); });

I will never got into the processing part. Any idea why it would be? Do I miss something obvious?

我永远也进不了加工环节。知道为什么吗?我是否遗漏了一些显而易见的东西?

On the side, does someone know a good description of a pattern that will help here? I'm looking to pick-up events on one thread (preferably not the main application thread), then process them on a separate thread at the same time blocking if the number of events is getting too high (typical thresholdingI assume)

另一方面,有人知道一个很好的模式描述吗?我希望在一个线程上提取事件(最好不是主应用程序线程),然后在一个单独的线程上处理它们,同时如果事件的数量太大,则阻塞它们(我假设是典型的阈值)。

Thanks in advance,

提前谢谢,

2 个解决方案

#1


1  

Stupid me...

愚蠢的我……

When I wrap my call in Task it is run on a Background thread. I should have wait for the task to finish before quiting.

当我在任务中结束调用时,它是在后台线程上运行的。我应该等任务完成后再辞职。

Having done that, that is assigning:

这样做之后,就是分配:

var task = Task.Factory.StartNew(/*task body*/); 
task.Wait();

and waiting I have got what I want.

等待,我得到了我想要的。

#2


0  

In order to figure out what's going on you should put some logging messages and verify that no exceptions are thrown when when you start the new task:

为了弄清楚发生了什么,您应该放置一些日志消息,并验证在启动新任务时没有抛出异常:

Thread listenerThread = new Thread(() => 
{
    // print a message when the task is started
    Console.WriteLine("Task started!"); 

    var listner = new Listener();
    listner.ProcessEvents(); 
});

// set your thread to background so it doesn't live on
// even after you're 
listenerThread.IsBackground = true;
listenerThread.Start();

Thread.Join() does the same thing as Task.Wait(). You can call it without specifying a timeout, you can specify an infinite timeout or if you want your test to stop waiting for the thread to finish, then you specify a timeout in milliseconds:

join()执行与Task.Wait()相同的操作。您可以在不指定超时的情况下调用它,也可以指定无限超时,或者如果您希望您的测试停止等待线程结束,那么您可以指定以毫秒为单位的超时:

// this will block until the thread is complete
listenerThread.Join(Timeout.Infinite);

If you need to stop the test, then you have to perform an asynchronous interrupt. Interrupt raises a ThreadInterruptedException inside the Thread which you're supposed to catch (as shown in the ProcessEvents method):

如果需要停止测试,则必须执行异步中断。中断在要捕获的线程中引发一个ThreadInterruptedException(如ProcessEvents方法所示):

listenerThread.Interrupt();

Additionally, you're using a blocking collection, but you're not utilizing the blocking functionality of the collection. There is no reason why you should keep retrying when there is nothing in the collection, instead you would rather block until there is something in it (which is what this collection is designed to do):

此外,您正在使用一个阻塞集合,但是没有使用该集合的阻塞功能。当集合中没有任何东西时,你没有理由继续重复尝试,相反,你宁愿阻塞,直到有东西在里面(这是这个集合的目的):

internal void ProcessEvents()
{
    while (true)
    {
        try
        {
            var myEvent = _blockingCollection.Take();
            Console.WriteLine("Received event");
        }
        catch(ThreadInterruptedException)
        {
            Console.WriteLine("Thread interrupted!");
            break;// break out of the loop!
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            // Do you REALLY want to keep going if there is an unexpected exception?
            // Consider breaking out of the loop...
        } 
    } 
} 

#1


1  

Stupid me...

愚蠢的我……

When I wrap my call in Task it is run on a Background thread. I should have wait for the task to finish before quiting.

当我在任务中结束调用时,它是在后台线程上运行的。我应该等任务完成后再辞职。

Having done that, that is assigning:

这样做之后,就是分配:

var task = Task.Factory.StartNew(/*task body*/); 
task.Wait();

and waiting I have got what I want.

等待,我得到了我想要的。

#2


0  

In order to figure out what's going on you should put some logging messages and verify that no exceptions are thrown when when you start the new task:

为了弄清楚发生了什么,您应该放置一些日志消息,并验证在启动新任务时没有抛出异常:

Thread listenerThread = new Thread(() => 
{
    // print a message when the task is started
    Console.WriteLine("Task started!"); 

    var listner = new Listener();
    listner.ProcessEvents(); 
});

// set your thread to background so it doesn't live on
// even after you're 
listenerThread.IsBackground = true;
listenerThread.Start();

Thread.Join() does the same thing as Task.Wait(). You can call it without specifying a timeout, you can specify an infinite timeout or if you want your test to stop waiting for the thread to finish, then you specify a timeout in milliseconds:

join()执行与Task.Wait()相同的操作。您可以在不指定超时的情况下调用它,也可以指定无限超时,或者如果您希望您的测试停止等待线程结束,那么您可以指定以毫秒为单位的超时:

// this will block until the thread is complete
listenerThread.Join(Timeout.Infinite);

If you need to stop the test, then you have to perform an asynchronous interrupt. Interrupt raises a ThreadInterruptedException inside the Thread which you're supposed to catch (as shown in the ProcessEvents method):

如果需要停止测试,则必须执行异步中断。中断在要捕获的线程中引发一个ThreadInterruptedException(如ProcessEvents方法所示):

listenerThread.Interrupt();

Additionally, you're using a blocking collection, but you're not utilizing the blocking functionality of the collection. There is no reason why you should keep retrying when there is nothing in the collection, instead you would rather block until there is something in it (which is what this collection is designed to do):

此外,您正在使用一个阻塞集合,但是没有使用该集合的阻塞功能。当集合中没有任何东西时,你没有理由继续重复尝试,相反,你宁愿阻塞,直到有东西在里面(这是这个集合的目的):

internal void ProcessEvents()
{
    while (true)
    {
        try
        {
            var myEvent = _blockingCollection.Take();
            Console.WriteLine("Received event");
        }
        catch(ThreadInterruptedException)
        {
            Console.WriteLine("Thread interrupted!");
            break;// break out of the loop!
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            // Do you REALLY want to keep going if there is an unexpected exception?
            // Consider breaking out of the loop...
        } 
    } 
}