从Java并发迁移到Scala并发

时间:2020-12-16 07:05:32

I have a fairly standard mechanism in Java for solving the problem:

我有一个相当标准的Java机制来解决这个问题:

  • Work items must be scheduled to execute at a particular time
  • 必须安排工作项在特定时间执行

  • Each work item must then wait on a condition becoming true
  • 然后,每个工作项必须等待条件成为真

  • Work items should be cancellable
  • 工作项应该可以取消

The solution I use is as follows:

我使用的解决方案如下:

  1. Have a single-threaded scheduler to schedule my work items
  2. 有一个单线程调度程序来安排我的工作项

  3. Have an ExecutorService (which may be multi-threaded)
  4. 有一个ExecutorService(可能是多线程的)

  5. Each scheduled work item then submits the actual work to the ExecutorService. The returned Future is cached in a map. A completion service is used to remove the future from the cache when the work is completed
  6. 然后,每个计划的工作项将实际工作提交给ExecutorService。返回的Future将缓存在地图中。完成服务用于在工作完成时从缓存中删除未来

  7. Items can be cancelled via the cached futures
  8. 可以通过缓存的期货取消物品

Of course, my executor needs to be at least as big as the number of blocking work items I expect to have but this is not a problem in practice.

当然,我的执行者需要至少与我期望的阻塞工作项的数量一样大,但这在实践中不是问题。

So now I'm coding in Scala and using the actor framework. Assuming that my work item can be encapsulated in an event sent to an actor:

所以现在我在Scala中编码并使用actor框架。假设我的工作项可以封装在发送给actor的事件中:

  1. What mechanism would I use to schedule a work item for a specific time?
  2. 我将使用什么机制来安排特定时间的工作项?

  3. If a work item is an event sent to an actor, how can I ensure that the backing thread pool is bigger than the number of items that can be blocking at the same time
  4. 如果工作项是发送给actor的事件,我如何确保支持线程池大于可以同时阻塞的项数

  5. How can I cause a previously scheduled work item to be cancelled?
  6. 如何取消之前安排的工作项?

2 个解决方案

#1


What mechanism would I use to schedule a work item for a specific time?

我将使用什么机制来安排特定时间的工作项?

I would use a java.util.concurrent.ScheduledExecutorService.

我会使用java.util.concurrent.ScheduledExecutorService。

If a work item is an event sent to an actor, how can I ensure that the backing thread pool is bigger than the number of items that can be blocking at the same time

如果工作项是发送给actor的事件,我如何确保支持线程池大于可以同时阻塞的项数

This strikes me as a design that defeats the effort of parallelisation. Try to minimise or eliminate blocking and global state. These are barriers to composability and scalability. For example, consider having a single dedicated thread that waits for files to arrive and then fires events off to actors. Or look at java.nio for asynchronous non-blocking I/O.

这让我觉得这是一种破坏并行化努力的设计。尽量减少或消除阻塞和全局状态。这些是可组合性和可伸缩性的障碍。例如,考虑让一个专用线程等待文件到达,然后将事件发送给actor。或者查看java.nio以获取异步非阻塞I / O.

I don't fully understand your requirements here, but it seems like you could have a single thread/actor looking for I/O events. Then as your scheduled "work items", schedule effects that create non-blocking actors. Have those actors register themselves with the I/O thread/actor to receive messages about I/O events that they care about.

我不完全理解你的要求,但似乎你可以让一个线程/角色寻找I / O事件。然后作为计划的“工作项”,安排创建非阻塞演员的效果。让这些参与者向I / O线程/ actor注册自己,以接收有关他们关心的I / O事件的消息。

How can I cause a previously scheduled work item to be cancelled?

如何取消之前安排的工作项?

ScheduledExecutorService returns Futures. What you have is not a bad design in that regard. Collect them in a Map and call future.cancel().

ScheduledExecutorService返回期货。在这方面,你所拥有的并不是一个糟糕的设计。在Map中收集它们并调用future.cancel()。

#2


You could have a scheduling actor that has a list of scheduled actors, and uses Actor.receiveWithin() to wake up every second or so and send messages to actors that are ready to be executed. The scheduling actor could also handle cancelling. Another option is to let every actor handle its own scheduling directly with receiveWithin(), instead of centralizing scheduling.

您可以拥有一个具有已调度actor列表的调度actor,并使用Actor.receiveWithin()每秒唤醒一次,并将消息发送给准备执行的actor。调度演员也可以处理取消。另一个选择是让每个actor直接使用receiveWithin()处理自己的调度,而不是集中调度。

There is some discussion on this issue in the blog post Simple cron like scheduler in Scala.

有关此问题的讨论在Scala中的简单cron(如调度程序)中发表。

#1


What mechanism would I use to schedule a work item for a specific time?

我将使用什么机制来安排特定时间的工作项?

I would use a java.util.concurrent.ScheduledExecutorService.

我会使用java.util.concurrent.ScheduledExecutorService。

If a work item is an event sent to an actor, how can I ensure that the backing thread pool is bigger than the number of items that can be blocking at the same time

如果工作项是发送给actor的事件,我如何确保支持线程池大于可以同时阻塞的项数

This strikes me as a design that defeats the effort of parallelisation. Try to minimise or eliminate blocking and global state. These are barriers to composability and scalability. For example, consider having a single dedicated thread that waits for files to arrive and then fires events off to actors. Or look at java.nio for asynchronous non-blocking I/O.

这让我觉得这是一种破坏并行化努力的设计。尽量减少或消除阻塞和全局状态。这些是可组合性和可伸缩性的障碍。例如,考虑让一个专用线程等待文件到达,然后将事件发送给actor。或者查看java.nio以获取异步非阻塞I / O.

I don't fully understand your requirements here, but it seems like you could have a single thread/actor looking for I/O events. Then as your scheduled "work items", schedule effects that create non-blocking actors. Have those actors register themselves with the I/O thread/actor to receive messages about I/O events that they care about.

我不完全理解你的要求,但似乎你可以让一个线程/角色寻找I / O事件。然后作为计划的“工作项”,安排创建非阻塞演员的效果。让这些参与者向I / O线程/ actor注册自己,以接收有关他们关心的I / O事件的消息。

How can I cause a previously scheduled work item to be cancelled?

如何取消之前安排的工作项?

ScheduledExecutorService returns Futures. What you have is not a bad design in that regard. Collect them in a Map and call future.cancel().

ScheduledExecutorService返回期货。在这方面,你所拥有的并不是一个糟糕的设计。在Map中收集它们并调用future.cancel()。

#2


You could have a scheduling actor that has a list of scheduled actors, and uses Actor.receiveWithin() to wake up every second or so and send messages to actors that are ready to be executed. The scheduling actor could also handle cancelling. Another option is to let every actor handle its own scheduling directly with receiveWithin(), instead of centralizing scheduling.

您可以拥有一个具有已调度actor列表的调度actor,并使用Actor.receiveWithin()每秒唤醒一次,并将消息发送给准备执行的actor。调度演员也可以处理取消。另一个选择是让每个actor直接使用receiveWithin()处理自己的调度,而不是集中调度。

There is some discussion on this issue in the blog post Simple cron like scheduler in Scala.

有关此问题的讨论在Scala中的简单cron(如调度程序)中发表。