用于重复非锁定条件信令的Java抽象

时间:2022-05-07 21:06:32

Sorry about the confusing title. Not sure quite how to phrase it, which may be the problem!

抱歉这个令人困惑的标题。不确定如何表达它,这可能是问题!

I'm looking for a good abstraction to use for a situation involving concurrent threads.

我正在寻找一个很好的抽象来用于涉及并发线程的情况。

I've been able to get close, but not quite nail it.

我已经能够接近,但并没有完全钉住它。

Simplified slightly, I've got two kinds of sensor input being collected on an Android phone - orientation-type stuff and WiFi scans.

稍微简化,我在Android手机上收集了两种传感器输入 - 方向类型的东西和WiFi扫描。

When enough of both have been collected, I want to trigger an event to occur which uses the data. However, I don't want to stop there - this process should continue N times.

当收集到足够的两者时,我想触发一个使用数据的事件。但是,我不想停在那里 - 这个过程应该继续N次。

At first I just used while loops on the condition, e.g.

起初我只是在条件上使用while循环,例如

startCollecting();
while (samplesCollected < X){
    // wait
    while (directionCount < Y || scanCount < Z){}; 
    // then
    doSomeStuff();
    samplesCollected++; 
}
stopCollecting();

However, I've been told by SO that this is a poor performer, and indeed I have been experiencing some lock up of the UI (even though it's on a different thread), so I decided to utilise java.util.concurrent.

然而,我被SO告知这是一个表现不佳的人,事实上我已经遇到了一些UI的锁定(即使它在不同的线程上),所以我决定利用java.util.concurrent。

The problem is that I can't quite work out which abstraction to use, perhaps due to my inexperience.

问题是我无法确定要使用哪种抽象,可能是由于我的经验不足。

  • Condition(s) on ReentrantLock:

    ReentrantLock上的条件:

    idea of condition seems great - but it's not the case that I want to control a Shared Resource - I want the data collection to continue in the background whilst the first batch is processed - so where do I call lock? If I don't lock, then it throws an IllegalMonitorStateException.

    条件的想法似乎很好 - 但我不想控制共享资源 - 我希望数据收集在后台继续进行,同时处理第一批 - 所以我在哪里调用锁?如果我没有锁定,那么它会抛出IllegalMonitorStateException。

  • CountdownLatch:

    CountdownLatch:

    seems ideal - when the collecting threads have data available, they can call countDown(), and when countDown has been called enough times then the action can continue. But countDown is supposed to be a one-off execution, and I need this to repeat several times.

    看起来很理想 - 当收集线程有数据可用时,它们可以调用countDown(),并且当countDown被调用足够多次时,该操作可以继续。但是countDown应该是一次性执行,我需要重复几次。

  • CyclicBarrier:

    的CyclicBarrier:

    docs for CountdownLatch suggest a CyclicBarrier should be used instead if you want the behaviour to be repeatable - but the metaphor of the CyclicBarrier seems totally wrong for this situation, and I don't understand how to use it for this.

    如果你希望行为是可重复的,那么CountdownLatch的文档建议应该使用CyclicBarrier - 但CyclicBarrier的隐喻对于这种情况似乎完全错误,我不明白如何使用它。

I've linked a few related questions below - any guidance would be much appreciated.

我已将下面的几个相关问题联系起来了 - 我们非常感谢任何指导。

Efficiency - use of Thread.yield in loop that waits for variable change

效率 - 在循环中使用Thread.yield等待变量更改

How to make a Java thread wait for another thread's output?

如何让Java线程等待另一个线程的输出?

is there a 'block until condition becomes true' function in java?

在java中有一个'阻止直到条件变为真'的功能吗?

A simple scenario using wait() and notify() in java

在java中使用wait()和notify()的简单方案

3 个解决方案

#1


2  

We have a similar implementaion in our code. We have created a inner class which implements runnable and can process the data. We keep on reading the data in a single thread and once the size of data reaches a particular limit then we pass on that data to an instance of inner class and submit that inner class instance as task to ThreadPoolExecutor service.

我们的代码中有类似的实现。我们创建了一个实现runnable的内部类,可以处理数据。我们继续在单个线程中读取数据,一旦数据大小达到特定限制,我们将该数据传递给内部类的实例,并将该内部类实例作为任务提交给ThreadPoolExecutor服务。

This works very well for us.

这对我们非常有效。

#2


2  

You might wish to use a concurrent queue (of the BlockingQueue variety).

您可能希望使用并发队列(BlockingQueue变种)。

You fill the queue from the threads reading the sensors (instead of putting them into whatever structures you are putting them in now).

您从读取传感器的线程中填充队列(而不是将它们放入您现在放置它们的任何结构中)。

In your loop, take a piece of data, examine what it is (orientation or wifi), increment the correct counter, and put the data somewhere (probably a local list of some sort). Once you have enough, pass the data you collected to your processing function.

在你的循环中,获取一段数据,检查它是什么(方向或wifi),增加正确的计数器,并将数据放在某处(可能是某种类型的本地列表)。获得足够的数据后,将收集的数据传递给处理函数。

This works because your thread sleeps when it tries to take something from the queue and nothing is available, so it doesn't sit around polling the counter.

这是有效的,因为你的线程在尝试从队列中取出某些内容并且没有任何可用时会休眠,因此它不会围绕轮询计数器。

#3


1  

You're app locked up because that piece of code is busy waiting which is generally a bad thing. For a quick fix you can add a Thread.sleep(25) (or equivalent) in the inner while loop and that should fix the lock up.

你的应用被锁定了,因为那段代码忙着等待,这通常是一件坏事。为了快速修复,您可以在内部while循环中添加Thread.sleep(25)(或等效项),这应该可以解决锁定问题。

Couple other things ... First off the variables samplesCollected, directionCount, and scanCount should be marked volatile or be AtomicLong (or AtomicInteger). Otherwise you are not guaranteed to see changes made in another thread to them. Read about memory barriers and more specifically about the Java memory model to understand why.

结合其他事情......首先,变量samplesCollected,directionCount和scanCount应标记为volatile或者是AtomicLong(或AtomicInteger)。否则,您无法保证在另一个线程中看到对它们所做的更改。阅读内存障碍,更具体地说,了解Java内存模型以了解原因。

If you ensure the variables are thread safe and add the Thread.sleep(...) you should be fine (it won't lock up your app and should function correctly) though it's not an ideal solution.

如果你确保变量是线程安全的并添加Thread.sleep(...)你应该没问题(它不会锁定你的应用程序并且应该正常运行)虽然它不是一个理想的解决方案。

This isn't an ideal solution though. You can get rid of this master thread by having the worker threads themselves check after each increment if it's above the threshold. If so then they can kick off a thread (or in the same thread) execute your post-aggregation code. Similarly if it reaches a certain max threshold you can signal all the threads to stop collecting. You'll need to use AtomicLong.incrementAndGet() for this to work properly though to ensure that the threads process the counts properly (volatile will not work).

但这不是一个理想的解决方案。如果工作线程在每个增量之后检查它是否高于阈值,则可以摆脱此主线程。如果是这样,那么他们可以启动一个线程(或在同一个线程中)执行您的聚合后代码。同样,如果达到某个最大阈值,您可以通知所有线程停止收集。您需要使用AtomicLong.incrementAndGet()才能正常工作,以确保线程正确处理计数(volatile不起作用)。

#1


2  

We have a similar implementaion in our code. We have created a inner class which implements runnable and can process the data. We keep on reading the data in a single thread and once the size of data reaches a particular limit then we pass on that data to an instance of inner class and submit that inner class instance as task to ThreadPoolExecutor service.

我们的代码中有类似的实现。我们创建了一个实现runnable的内部类,可以处理数据。我们继续在单个线程中读取数据,一旦数据大小达到特定限制,我们将该数据传递给内部类的实例,并将该内部类实例作为任务提交给ThreadPoolExecutor服务。

This works very well for us.

这对我们非常有效。

#2


2  

You might wish to use a concurrent queue (of the BlockingQueue variety).

您可能希望使用并发队列(BlockingQueue变种)。

You fill the queue from the threads reading the sensors (instead of putting them into whatever structures you are putting them in now).

您从读取传感器的线程中填充队列(而不是将它们放入您现在放置它们的任何结构中)。

In your loop, take a piece of data, examine what it is (orientation or wifi), increment the correct counter, and put the data somewhere (probably a local list of some sort). Once you have enough, pass the data you collected to your processing function.

在你的循环中,获取一段数据,检查它是什么(方向或wifi),增加正确的计数器,并将数据放在某处(可能是某种类型的本地列表)。获得足够的数据后,将收集的数据传递给处理函数。

This works because your thread sleeps when it tries to take something from the queue and nothing is available, so it doesn't sit around polling the counter.

这是有效的,因为你的线程在尝试从队列中取出某些内容并且没有任何可用时会休眠,因此它不会围绕轮询计数器。

#3


1  

You're app locked up because that piece of code is busy waiting which is generally a bad thing. For a quick fix you can add a Thread.sleep(25) (or equivalent) in the inner while loop and that should fix the lock up.

你的应用被锁定了,因为那段代码忙着等待,这通常是一件坏事。为了快速修复,您可以在内部while循环中添加Thread.sleep(25)(或等效项),这应该可以解决锁定问题。

Couple other things ... First off the variables samplesCollected, directionCount, and scanCount should be marked volatile or be AtomicLong (or AtomicInteger). Otherwise you are not guaranteed to see changes made in another thread to them. Read about memory barriers and more specifically about the Java memory model to understand why.

结合其他事情......首先,变量samplesCollected,directionCount和scanCount应标记为volatile或者是AtomicLong(或AtomicInteger)。否则,您无法保证在另一个线程中看到对它们所做的更改。阅读内存障碍,更具体地说,了解Java内存模型以了解原因。

If you ensure the variables are thread safe and add the Thread.sleep(...) you should be fine (it won't lock up your app and should function correctly) though it's not an ideal solution.

如果你确保变量是线程安全的并添加Thread.sleep(...)你应该没问题(它不会锁定你的应用程序并且应该正常运行)虽然它不是一个理想的解决方案。

This isn't an ideal solution though. You can get rid of this master thread by having the worker threads themselves check after each increment if it's above the threshold. If so then they can kick off a thread (or in the same thread) execute your post-aggregation code. Similarly if it reaches a certain max threshold you can signal all the threads to stop collecting. You'll need to use AtomicLong.incrementAndGet() for this to work properly though to ensure that the threads process the counts properly (volatile will not work).

但这不是一个理想的解决方案。如果工作线程在每个增量之后检查它是否高于阈值,则可以摆脱此主线程。如果是这样,那么他们可以启动一个线程(或在同一个线程中)执行您的聚合后代码。同样,如果达到某个最大阈值,您可以通知所有线程停止收集。您需要使用AtomicLong.incrementAndGet()才能正常工作,以确保线程正确处理计数(volatile不起作用)。