Java中线程安全队列和“master/worker”程序的模式/原则

时间:2021-09-24 11:46:27

I have a problem which I believe is the classic master/worker pattern, and I'm seeking advice on implementation. Here's what I currently am thinking about the problem:

我有一个问题,我认为是典型的主/工模式,我正在寻求关于实现的建议。以下是我目前对这个问题的看法:

There's a global "queue" of some sort, and it is a central place where "the work to be done" is kept. Presumably this queue will be managed by a kind of "master" object. Threads will be spawned to go find work to do, and when they find work to do, they'll tell the master thing (whatever that is) to "add this to the queue of work to be done".

有某种全局“队列”,它是保存“要做的工作”的中心位置。假定这个队列将由一种“主”对象管理。线程将被派生出来去找工作去做,当他们找到工作去做时,他们会告诉master thing(不管那是什么)“把这个添加到工作的队列中”。

The master, perhaps on an interval, will spawn other threads that actually perform the work to be done. Once a thread completes its work, I'd like it to notify the master that the work is finished. Then, the master can remove this work from the queue.

主线程(可能是在一个间隔中)将生成其他实际执行要执行的工作的线程。一旦一个线程完成了它的工作,我希望它通知主工作已经完成。然后,master可以从队列中删除该工作。

I've done a fair amount of thread programming in Java in the past, but it's all been prior to JDK 1.5 and consequently I am not familiar with the appropriate new APIs for handling this case. I understand that JDK7 will have fork-join, and that that might be a solution for me, but I am not able to use an early-access product in this project.

过去我在Java中做了大量的线程编程,但都是在JDK 1.5之前完成的,因此我不熟悉处理这种情况的合适的新api。我知道JDK7将有fork-join,这对我来说可能是一个解决方案,但是我不能在这个项目中使用早期访问产品。

The problems, as I see them, are:

在我看来,问题是:

1) how to have the "threads doing the work" communicate back to the master telling them that their work is complete and that the master can now remove the work from the queue

1)如何让“执行工作的线程”与主服务器通信,告诉主服务器他们的工作已经完成,主服务器现在可以从队列中删除该工作

2) how to efficiently have the master guarantee that work is only ever scheduled once. For example, let's say this queue has a million items, and it wants to tell a worker to "go do these 100 things". What's the most efficient way of guaranteeing that when it schedules work to the next worker, it gets "the next 100 things" and not "the 100 things I've already scheduled"?

2)如何有效地保证只安排一次工作。例如,假设这个队列有一百万项,它想告诉一个工作人员“去做这100件事”。什么是最有效的方法来保证当它为下一个员工安排工作时,它会得到“下一个100件事”而不是“我已经安排了的100件事”?

3) choosing an appropriate data structure for the queue. My thinking here is that the "threads finding work to do" could potentially find the same work to do more than once, and they'd send a message to the master saying "here's work", and the master would realize that the work has already been scheduled and consequently should ignore the message. I want to ensure that I choose the right data structure such that this computation is as cheap as possible.

3)为队列选择合适的数据结构。我的想法是“线程发现工作要做”可能会不止一次发现同样的工作要做,他们会发送消息到主说“这是工作”,和主意识到这项工作已经安排,因此应该忽略该消息。我想确保我选择了正确的数据结构,这样计算就尽可能地便宜。

Traditionally, I would have done this in a database, in sort of a finite-state-machine manner, working "tasks" through from start to complete. However, in this problem, I don't want to use a database because of the high volume and volatility of the queue. In addition, I'd like to keep this as light-weight as possible. I don't want to use any app server if that can be avoided.

传统上,我应该在数据库中完成这一工作,以一种有限状态机的方式,从开始到完成的工作“任务”。但是,在这个问题中,我不希望使用数据库,因为队列的容量和波动性很大。另外,我想尽量保持这个重量轻一点。如果可以避免的话,我不想使用任何应用服务器。

It is quite likely that this problem I'm describing is a common problem with a well-known name and accepted set of solutions, but I, with my lowly non-CS degree, do not know what this is called (i.e. please be gentle).

我所描述的这个问题很可能是一个常见的问题,有一个众所周知的名字和一套公认的解决方案,但是我,以我低的非cs学位,不知道这叫什么(也就是说,请温柔一些)。

Thanks for any and all pointers.

谢谢你的指点。

6 个解决方案

#1


7  

As far as I understand your requirements, you need ExecutorService. ExecutorService have

据我了解你的要求,你需要执行服务。ExecutorService有

submit(Callable task)

method which return value is Future. Future is a blocking way to communicate back from worker to master. You could easily expand this mechanism to work is asynchronous manner. And yes, ExecutorService also maintaining work queue like ThreadPoolExecutor. So you don't need to bother about scheduling, in most cases. java.util.concurrent package already have efficient implementations of thread safe queue (ConcurrentLinked queue - nonblocking, and LinkedBlockedQueue - blocking).

返回值为Future的方法。未来是一种阻断员工与主人之间交流的方式。可以很容易地将这种机制扩展为异步方式。是的,ExecutorService也像ThreadPoolExecutor一样维护工作队列。所以在大多数情况下,你不需要操心调度问题。java.util。并发包已经有了线程安全队列(ConcurrentLinked queue - non - blocking, LinkedBlockedQueue - blocking)的有效实现。

#2


4  

Check out java.util.concurrent in the Java library.

查看java.util。在Java库中并发。

Depending on your application it might be as simple as cobbling together some blocking queue and a ThreadPoolExecutor.

根据应用程序的不同,它可以简单地拼凑一些阻塞队列和一个ThreadPoolExecutor。

Also, the book Java Concurrency in Practice by Brian Goetz might be helpful.

此外,Brian Goetz在实践中编写的《Java并发》这本书可能会有帮助。

#3


4  

First, why do you want to hold the items after a worker started doing them? Normally, you would have a queue of work and a worker takes items out of this queue. This would also solve the "how can I prevent workers from getting the same item"-problem.

首先,为什么你想要在一个工人开始做这些事情后仍然保留这些东西?通常情况下,您会有一个工作队列,一个worker会从这个队列中取出项。这也将解决“我怎样才能阻止工人得到同样的东西”的问题。

To your questions:

你的问题:

1) how to have the "threads doing the work" communicate back to the master telling them that their work is complete and that the master can now remove the work from the queue

1)如何让“执行工作的线程”与主服务器通信,告诉主服务器他们的工作已经完成,主服务器现在可以从队列中删除该工作

The master could listen to the workers using the listener/observer pattern

master可以使用listener/observer模式监听worker

2) how to efficiently have the master guarantee that work is only ever scheduled once. For example, let's say this queue has a million items, and it wants to tell a worker to "go do these 100 things". What's the most efficient way of guaranteeing that when it schedules work to the next worker, it gets "the next 100 things" and not "the 100 things I've already scheduled"?

2)如何有效地保证只安排一次工作。例如,假设这个队列有一百万项,它想告诉一个工作人员“去做这100件事”。什么是最有效的方法来保证当它为下一个员工安排工作时,它会得到“下一个100件事”而不是“我已经安排了的100件事”?

See above. I would let the workers pull the items out of the queue.

见上图。我会让工人们把东西从队伍里拿出来。

3) choosing an appropriate data structure for the queue. My thinking here is that the "threads finding work to do" could potentially find the same work to do more than once, and they'd send a message to the master saying "here's work", and the master would realize that the work has already been scheduled and consequently should ignore the message. I want to ensure that I choose the right data structure such that this computation is as cheap as possible.

3)为队列选择合适的数据结构。我的想法是“线程发现工作要做”可能会不止一次发现同样的工作要做,他们会发送消息到主说“这是工作”,和主意识到这项工作已经安排,因此应该忽略该消息。我想确保我选择了正确的数据结构,这样计算就尽可能地便宜。

There are Implementations of a blocking queue since Java 5

从Java 5开始就有阻塞队列的实现

#4


1  

Don't forget Jini and Javaspaces. What you're describing sounds very like the classic producer/consumer pattern that space-based architectures excel at.

不要忘记Jini和Javaspaces。您所描述的内容听起来非常像基于空间的体系结构擅长的典型生产者/消费者模式。

A producer will write the jobs into the space. 1 or more consumers will take out jobs (under a transaction) and work on that in parallel, and then write the results back. Since it's under a transaction, if a problem occurs the job is made available again for another consumer .

一个制作人将把作业写进这个空间。1个或更多的消费者将在事务下进行工作并并行处理,然后将结果写回。由于它是在一个事务下,如果出现问题,则该作业将再次提供给另一个使用者。

You can scale this trivially by adding more consumers. This works especially well when the consumers are separate VMs and you scale across the network.

你可以通过增加更多的消费者来扩大这个范围。当消费者是独立的vm并且跨网络扩展时,这种方法尤其有效。

#5


0  

If you are open to the idea of Spring, then check out their Spring Integration project. It gives you all the queue/thread-pool boilerplate out of the box and leaves you to focus on the business logic. Configuration is kept to a minimum using @annotations.

如果您愿意接受Spring的概念,那么请查看他们的Spring集成项目。它为您提供了开箱即用的所有队列/线程池样板文件,并让您专注于业务逻辑。使用@annotation将配置保持在最小。

btw, the Goetz is very good.

顺便说一句,Goetz很不错。

#6


0  

This doesn't sound like a master-worker problem, but a specialized client above a threadpool. Given that you have a lot of scavenging threads and not a lot of processing units, it may be worthwhile simply doing a scavaging pass and then a computing pass. By storing the work items in a Set, the uniqueness constraint will remove duplicates. The second pass can submit all of the work to an ExecutorService to perform the process in parallel.

这听起来不像是主工问题,而是线程池之上的专门客户端。既然您有很多清除线程,而没有很多处理单元,那么简单地进行清除传递和计算传递可能是值得的。通过将工作项存储在一个集合中,惟一性约束将删除重复项。第二遍可以将所有工作提交给ExecutorService以并行执行流程。

A master-worker model generally assumes that the data provider has all of the work and supplies it to the master to manage. The master controls the work execution and deals with distributed computation, time-outs, failures, retries, etc. A fork-join abstraction is a recursive rather than iterative data provider. A map-reduce abstraction is a multi-step master-worker that is useful in certain scenarios.

主工作者模型通常假定数据提供者拥有所有的工作,并将其提供给管理员来管理。主控制器控制工作执行并处理分布式计算、超时、故障、重试等等。fork-join抽象是递归的,而不是迭代的数据提供者。map-reduce抽象是一个多步骤的主工作程序,在某些场景中非常有用。

A good example of master-worker is for trivially parallel problems, such as finding prime numbers. Another is a data load where each entry is independant (validate, transform, stage). The need to process a known working set, handle failures, etc. is what makes a master-worker model different than a thread-pool. This is why a master must be in control and pushes the work units out, whereas a threadpool allows workers to pull work from a shared queue.

熟练工人的一个很好的例子是一些琐碎的并行问题,比如找到质数。另一个是数据加载,其中每个条目都是独立的(验证、转换、阶段)。处理已知的工作集、处理失败等等的需要是使主工模型与线程池不同的原因。这就是为什么master必须控制并将工作单元推出,而threadpool允许工作人员从共享队列中提取工作。

#1


7  

As far as I understand your requirements, you need ExecutorService. ExecutorService have

据我了解你的要求,你需要执行服务。ExecutorService有

submit(Callable task)

method which return value is Future. Future is a blocking way to communicate back from worker to master. You could easily expand this mechanism to work is asynchronous manner. And yes, ExecutorService also maintaining work queue like ThreadPoolExecutor. So you don't need to bother about scheduling, in most cases. java.util.concurrent package already have efficient implementations of thread safe queue (ConcurrentLinked queue - nonblocking, and LinkedBlockedQueue - blocking).

返回值为Future的方法。未来是一种阻断员工与主人之间交流的方式。可以很容易地将这种机制扩展为异步方式。是的,ExecutorService也像ThreadPoolExecutor一样维护工作队列。所以在大多数情况下,你不需要操心调度问题。java.util。并发包已经有了线程安全队列(ConcurrentLinked queue - non - blocking, LinkedBlockedQueue - blocking)的有效实现。

#2


4  

Check out java.util.concurrent in the Java library.

查看java.util。在Java库中并发。

Depending on your application it might be as simple as cobbling together some blocking queue and a ThreadPoolExecutor.

根据应用程序的不同,它可以简单地拼凑一些阻塞队列和一个ThreadPoolExecutor。

Also, the book Java Concurrency in Practice by Brian Goetz might be helpful.

此外,Brian Goetz在实践中编写的《Java并发》这本书可能会有帮助。

#3


4  

First, why do you want to hold the items after a worker started doing them? Normally, you would have a queue of work and a worker takes items out of this queue. This would also solve the "how can I prevent workers from getting the same item"-problem.

首先,为什么你想要在一个工人开始做这些事情后仍然保留这些东西?通常情况下,您会有一个工作队列,一个worker会从这个队列中取出项。这也将解决“我怎样才能阻止工人得到同样的东西”的问题。

To your questions:

你的问题:

1) how to have the "threads doing the work" communicate back to the master telling them that their work is complete and that the master can now remove the work from the queue

1)如何让“执行工作的线程”与主服务器通信,告诉主服务器他们的工作已经完成,主服务器现在可以从队列中删除该工作

The master could listen to the workers using the listener/observer pattern

master可以使用listener/observer模式监听worker

2) how to efficiently have the master guarantee that work is only ever scheduled once. For example, let's say this queue has a million items, and it wants to tell a worker to "go do these 100 things". What's the most efficient way of guaranteeing that when it schedules work to the next worker, it gets "the next 100 things" and not "the 100 things I've already scheduled"?

2)如何有效地保证只安排一次工作。例如,假设这个队列有一百万项,它想告诉一个工作人员“去做这100件事”。什么是最有效的方法来保证当它为下一个员工安排工作时,它会得到“下一个100件事”而不是“我已经安排了的100件事”?

See above. I would let the workers pull the items out of the queue.

见上图。我会让工人们把东西从队伍里拿出来。

3) choosing an appropriate data structure for the queue. My thinking here is that the "threads finding work to do" could potentially find the same work to do more than once, and they'd send a message to the master saying "here's work", and the master would realize that the work has already been scheduled and consequently should ignore the message. I want to ensure that I choose the right data structure such that this computation is as cheap as possible.

3)为队列选择合适的数据结构。我的想法是“线程发现工作要做”可能会不止一次发现同样的工作要做,他们会发送消息到主说“这是工作”,和主意识到这项工作已经安排,因此应该忽略该消息。我想确保我选择了正确的数据结构,这样计算就尽可能地便宜。

There are Implementations of a blocking queue since Java 5

从Java 5开始就有阻塞队列的实现

#4


1  

Don't forget Jini and Javaspaces. What you're describing sounds very like the classic producer/consumer pattern that space-based architectures excel at.

不要忘记Jini和Javaspaces。您所描述的内容听起来非常像基于空间的体系结构擅长的典型生产者/消费者模式。

A producer will write the jobs into the space. 1 or more consumers will take out jobs (under a transaction) and work on that in parallel, and then write the results back. Since it's under a transaction, if a problem occurs the job is made available again for another consumer .

一个制作人将把作业写进这个空间。1个或更多的消费者将在事务下进行工作并并行处理,然后将结果写回。由于它是在一个事务下,如果出现问题,则该作业将再次提供给另一个使用者。

You can scale this trivially by adding more consumers. This works especially well when the consumers are separate VMs and you scale across the network.

你可以通过增加更多的消费者来扩大这个范围。当消费者是独立的vm并且跨网络扩展时,这种方法尤其有效。

#5


0  

If you are open to the idea of Spring, then check out their Spring Integration project. It gives you all the queue/thread-pool boilerplate out of the box and leaves you to focus on the business logic. Configuration is kept to a minimum using @annotations.

如果您愿意接受Spring的概念,那么请查看他们的Spring集成项目。它为您提供了开箱即用的所有队列/线程池样板文件,并让您专注于业务逻辑。使用@annotation将配置保持在最小。

btw, the Goetz is very good.

顺便说一句,Goetz很不错。

#6


0  

This doesn't sound like a master-worker problem, but a specialized client above a threadpool. Given that you have a lot of scavenging threads and not a lot of processing units, it may be worthwhile simply doing a scavaging pass and then a computing pass. By storing the work items in a Set, the uniqueness constraint will remove duplicates. The second pass can submit all of the work to an ExecutorService to perform the process in parallel.

这听起来不像是主工问题,而是线程池之上的专门客户端。既然您有很多清除线程,而没有很多处理单元,那么简单地进行清除传递和计算传递可能是值得的。通过将工作项存储在一个集合中,惟一性约束将删除重复项。第二遍可以将所有工作提交给ExecutorService以并行执行流程。

A master-worker model generally assumes that the data provider has all of the work and supplies it to the master to manage. The master controls the work execution and deals with distributed computation, time-outs, failures, retries, etc. A fork-join abstraction is a recursive rather than iterative data provider. A map-reduce abstraction is a multi-step master-worker that is useful in certain scenarios.

主工作者模型通常假定数据提供者拥有所有的工作,并将其提供给管理员来管理。主控制器控制工作执行并处理分布式计算、超时、故障、重试等等。fork-join抽象是递归的,而不是迭代的数据提供者。map-reduce抽象是一个多步骤的主工作程序,在某些场景中非常有用。

A good example of master-worker is for trivially parallel problems, such as finding prime numbers. Another is a data load where each entry is independant (validate, transform, stage). The need to process a known working set, handle failures, etc. is what makes a master-worker model different than a thread-pool. This is why a master must be in control and pushes the work units out, whereas a threadpool allows workers to pull work from a shared queue.

熟练工人的一个很好的例子是一些琐碎的并行问题,比如找到质数。另一个是数据加载,其中每个条目都是独立的(验证、转换、阶段)。处理已知的工作集、处理失败等等的需要是使主工模型与线程池不同的原因。这就是为什么master必须控制并将工作单元推出,而threadpool允许工作人员从共享队列中提取工作。