I have currently two queues and items traveling between them. Initially, an item gets put into firstQueue
, then one of three dedicated thread moves it to secondQueue
and finally another dedicated thread removes it. These moves obviously include some processing. I need to be able to get the status of any item (IN_FIRST
, AFTER_FIRST
, IN_SECOND
, AFTER_SECOND
, or ABSENT
) and I implemented it manually by doing the update of the statusMap
where the queue gets modified like
我目前有两个队列和项目在它们之间移动。一开始,一个项目被放入firstQueue,然后三个专用线程中的一个将它移动到secondQueue,最后另一个专用线程将它移除。这些动作显然包括一些处理。我需要能够获得任何项的状态(IN_FIRST、AFTER_FIRST、IN_SECOND、AFTER_SECOND或lack),我通过更新statusMap来手动实现它,其中队列会被修改成这样
while (true) {
Item i = firstQueue.take();
statusMap.put(i, AFTER_FIRST);
process(i);
secondQueue.add(i);
statusMap.put(i, IN_SECOND);
}
This works, but it's ugly and leaves a time window where the status is inconsistent. The inconsistency is no big deal and it'd solvable by synchronization, but this could backfire as the queue is of limited capacity and may block. The ugliness bothers me more.
这是可行的,但是它很难看,并且在时间窗口中状态不一致。不一致性不是什么大问题,它可以通过同步来解决,但这可能会适得其反,因为队列的容量有限,可能会阻塞。丑陋更让我烦恼。
Efficiency hardly matters as the processing takes seconds. Dedicated threads are used in order to control concurrency. No item should ever be in multiple states (but this is not very important and not guaranteed by my current racy approach). There'll be more queues (and states) and they'll of different kinds (DelayQueue
, ArrayBlockingQueue
, and maybe PriorityQueue
).
效率并不重要,因为处理过程只需要几秒钟。使用专用线程来控制并发性。任何项目都不应该处于多个状态(但这不是很重要,我目前的方法也不能保证这一点)。将会有更多的队列(和状态),它们会有不同的类型(DelayQueue, ArrayBlockingQueue,或者PriorityQueue)。
I wonder if there's a nice solution generalizable to multiple queues?
我想知道是否有一个好的解决方案可以推广到多个队列?
4 个解决方案
#1
3
Does it make sense to wrap the queues with logic to manage the Item status?
使用逻辑包装队列来管理项目状态是否有意义?
public class QueueWrapper<E> implements BlockingQueue<E> {
private Queue<E> myQueue = new LinkedBlockingQueue<>();
private Map<E, Status> statusMap;
public QueueWrapper(Map<E, Status> statusMap) {
this.statusMap = statusMap;
}
[...]
@Override
public E take() throws InterruptedException {
E result = myQueue.take();
statusMap.put(result, Status.AFTER_FIRST);
return result;
}
That way status management is always related to (and contained in) queue operations...
这样,状态管理总是与队列操作相关(并且包含在队列中)……
Obviously statusMap
needs to be synchronized, but that would be an issue anyway.
显然,statusMap需要同步,但无论如何这都是一个问题。
#2
3
I see that your model might be improved in consistency, state control, and scaling.
我发现您的模型可能在一致性、状态控制和缩放方面得到改进。
A way of to implement this is accouple the item to your state, enqueue and dequeue this couple and create a mechanism to ensure state change.
实现这一点的一种方法是将项目与您的状态进行耦合,并对这一对进行排队和排出队列,并创建一个机制来确保状态更改。
My proposal can be see in figure below:
我的建议如下图所示:
According with this model and your example, we can to do:
根据这个模型和你的例子,我们可以做:
package *;
import java.util.concurrent.LinkedBlockingQueue;
import *.item.ItemState;
import *.task.CreatingTask;
import *.task.FirstMovingTask;
import *.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
Each task runs the operations op()
of according with below affirmation on ItemState:
每项任务在项目状态上按以下确认操作op():
one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it.
三个专用线程中的一个将其移动到secondQueue,最后另一个专用线程将其删除。
ItemState
is a immutable object that contains Item
and your State
. This ensures consistency between Item and State values.
ItemState是一个不可变的对象,它包含Item和您的状态。这确保了项目和状态值之间的一致性。
ItemState has acknowledgement about the next state creating a mechanism of self-controled state:
ItemState承认下一个州建立了一个自我控制的机制:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
With ItemState implemetation:
与ItemState implemetation:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
So this way is possible build solutions more elegant, flexible and scalable. Scalable because you can to control more states only changing next()
and generalizing the moving task for increase the number of queue.
因此,这种方式可以使构建解决方案更加优雅、灵活和可伸缩。可伸缩,因为您可以控制更多的状态,只更改next(),并将移动任务一般化,以增加队列的数量。
Results:
结果:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
UPDATE(06/07/2018): analysing the use of map for search Search in map using equals values like comparator might not work because usally the mapping between values and identity (key/hash) is not one-to-one(see figure bellow). In this way is need to create an sorted list for search values which results in O(n) (worst-case).
更新(06/07/2018):使用类似于比较器之类的值来分析地图在地图上的使用,可能不会起作用,因为值和标识(键/散列)之间的映射不是一对一的(见图bellow)。这样就需要为搜索值创建一个排序列表,结果是O(n)(最坏情况)。
with Item.getValuesHashCode()
:
与Item.getValuesHashCode():
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
In this case, you must keep Vector<ItemState>
instead of Item
and to use the key like the result of getValuesHashCode
. Change the mechanism of state-control for keep first reference of the Item and the state current. See bellow:
在这种情况下,必须保留Vector
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
To search you must use concurrentMapRef.get(key). The result will the reference of updated ItemState.
要搜索,必须使用concurrentMapRef.get(key)。结果将引用更新的ItemState。
Results in my tests for :
我的测试结果如下:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
More details in code: https://github.com/ag-studies/*-queue
代码中的更多细节:https://github.com/ag-studies/*-queue
UPDATED IN 06/09/2018: redesign
更新06/09/2018:重新设计
Generalizing this project, I can undestand that the state machine is something like:
概括这个项目,我可以毫不夸张地说,状态机是这样的:
In this way I decoupled the workers of the queues for improve concepts. I used an MemoryRep for keep the unique reference for item in overall processment. Of course that you can use strategies event-based if you need keep ItemState in a physic repository.
通过这种方式,我解耦了队列的工作人员以改进概念。我使用了一个MemoryRep来在整个过程中保存项目的惟一引用。当然,如果需要在物理存储库中保存ItemState,可以使用基于事件的策略。
This keep the previous idea and creates more legibility for the concepts. See this:
这保持了之前的想法,并为概念创建了更清晰的概念。看到这个:
I understand that each job will have two queue (input/output) and relationship with a business model! The researcher will always find the most updated and consistent state of Item.
我理解每个作业将有两个队列(输入/输出)和一个业务模型的关系!研究者总是能找到最更新、最一致的项目状态。
So, answering your ask:
所以,回答你的问:
-
I can find the consistent state of Item anywhere using MemoryRep (basically an Map), wrapping state and item in ItemState, and controlling the change state on job on enqueue or dequeue it.
我可以使用MemoryRep(基本上是映射)、在ItemState中包装状态和项目,以及控制作业上的更改状态,从而在任何地方找到项目的一致状态。
-
The performace is keeped, except on running of next()
除了运行next()之外,性能保持不变
-
The state is allways consistent (for your problem)
国家是始终如一的(对于你的问题)
-
In this model is possible use any queue type, any number of jobs/queues, and any number of state.
在这个模型中,可以使用任何队列类型、任意数量的作业/队列和任意数量的状态。
-
Additionaly this is beautiful!!
另外这是美丽的! !
#3
1
As previously answered, Wrap the queues or the item would be viable solutions or both.
如前所述,包装队列或项目都是可行的解决方案,或者两者都是。
public class ItemWrapper<E> {
E item;
Status status;
public ItemWrapper(Item i, Status s){ ... }
public setStatus(Status s){ ... }
// not necessary if you use a queue wrapper (see queue wrapper)
public boolean equals(Object obj) {
if ( obj instanceof ItemWrapper)
return item.equals(((ItemWrapper) obj).item)
return false;
}
public int hashCode(){
return item;
}
}
...
process(item) // process update status in the item
...
Probably a better way, already answered, is to have a QueueWrapper who update the queue status. For the fun I don't use a status map but I use the previously itemwrapper it seems cleaner (a status map works too).
可能更好的方法是,有一个队列包装器来更新队列状态。为了好玩,我不使用状态映射,但是我使用前面的itemwrapper它看起来更简洁(一个状态映射也可以)。
public class QueueWrapper<E> implements Queue<E> {
private Queue<ItemWrapper<E>> myQueue;
static private Status inStatus; // FIRST
static private Status outStatus; // AFTER_FIRST
public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
@Override
public boolean add(E e) {
return myQueue.add(new ItemWrapper(e, inStatus));
}
@Override
public E remove(){
ItemWrapper<E> result = myQueue.remove();
result.setStatus(outStatus)
return result.item;
}
...
}
You can also use AOP to inject status update in your queues without changing your queues (a status map should be more appropriate than itemwrapper).
您还可以使用AOP在队列中注入状态更新,而无需更改队列(状态映射应该比itemwrapper更合适)。
Maybe I didn't answer well your question because an easy way to know where is your item could be to check in each queue with "contains" function.
也许我没有很好地回答你的问题,因为要知道你的项目在哪里,一个简单的方法是使用“contains”函数检入每个队列。
#4
1
Here's something different from what others have said. Taking from the world of queue services and systems we have the concept of message acknowledgement. This is nice, because it also gives you some built in retry logic.
这与别人所说的有所不同。从队列服务和系统的世界中,我们有消息确认的概念。这很好,因为它还提供了一些重试逻辑的构建。
I'll lay out how it would work from a high level, and if you need I can add code.
我将从更高的层次阐述它是如何工作的,如果需要,我可以添加代码。
Essentially you'll have a Set
to go with each of your queues. You'll wrap your queues in an object so that when you dequeue an item a few things happen
本质上,您将拥有一个与每个队列匹配的集合。您将把队列包装在一个对象中,以便当您对一个项目进行下队列时,会发生一些事情
- The item is removed from the queue
- 项从队列中删除
- The item is added to the associated set
- 该项被添加到相关集。
- A task (lambda containing an atomic boolean (default false)) is scheduled. When run it will remove item from the set and if the boolean is false, put it back in the queue
- 计划一个任务(包含一个原子布尔值(默认为false)的lambda)。运行时,它将从集合中删除项,如果布尔值为false,则将其放回队列中
- The item and a wrapper around the boolean are returned to the caller
- 在布尔值周围的项和包装器返回给调用者。
Once process(i);
completes, your code will indicate receipt acknowledgement to the wrapper, and the wrapper will remove the item from the set and make the boolean false.
一旦过程(我);完成后,您的代码将向包装器表明接收确认,包装器将从集合中删除该项并使布尔值为false。
A method to return status would simply check which queue or set the item is in.
返回状态的方法只需检查哪个队列或设置条目。
Note that this gives "at least once" delivery, meaning an item will be processed at least once, but potentially more than once if the processing time is too close to the timeout.
注意,这提供了“至少一次”交付,这意味着一个项目将至少被处理一次,但如果处理时间太接近超时,则可能被处理多次。
#1
3
Does it make sense to wrap the queues with logic to manage the Item status?
使用逻辑包装队列来管理项目状态是否有意义?
public class QueueWrapper<E> implements BlockingQueue<E> {
private Queue<E> myQueue = new LinkedBlockingQueue<>();
private Map<E, Status> statusMap;
public QueueWrapper(Map<E, Status> statusMap) {
this.statusMap = statusMap;
}
[...]
@Override
public E take() throws InterruptedException {
E result = myQueue.take();
statusMap.put(result, Status.AFTER_FIRST);
return result;
}
That way status management is always related to (and contained in) queue operations...
这样,状态管理总是与队列操作相关(并且包含在队列中)……
Obviously statusMap
needs to be synchronized, but that would be an issue anyway.
显然,statusMap需要同步,但无论如何这都是一个问题。
#2
3
I see that your model might be improved in consistency, state control, and scaling.
我发现您的模型可能在一致性、状态控制和缩放方面得到改进。
A way of to implement this is accouple the item to your state, enqueue and dequeue this couple and create a mechanism to ensure state change.
实现这一点的一种方法是将项目与您的状态进行耦合,并对这一对进行排队和排出队列,并创建一个机制来确保状态更改。
My proposal can be see in figure below:
我的建议如下图所示:
According with this model and your example, we can to do:
根据这个模型和你的例子,我们可以做:
package *;
import java.util.concurrent.LinkedBlockingQueue;
import *.item.ItemState;
import *.task.CreatingTask;
import *.task.FirstMovingTask;
import *.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
Each task runs the operations op()
of according with below affirmation on ItemState:
每项任务在项目状态上按以下确认操作op():
one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it.
三个专用线程中的一个将其移动到secondQueue,最后另一个专用线程将其删除。
ItemState
is a immutable object that contains Item
and your State
. This ensures consistency between Item and State values.
ItemState是一个不可变的对象,它包含Item和您的状态。这确保了项目和状态值之间的一致性。
ItemState has acknowledgement about the next state creating a mechanism of self-controled state:
ItemState承认下一个州建立了一个自我控制的机制:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
With ItemState implemetation:
与ItemState implemetation:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
So this way is possible build solutions more elegant, flexible and scalable. Scalable because you can to control more states only changing next()
and generalizing the moving task for increase the number of queue.
因此,这种方式可以使构建解决方案更加优雅、灵活和可伸缩。可伸缩,因为您可以控制更多的状态,只更改next(),并将移动任务一般化,以增加队列的数量。
Results:
结果:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
UPDATE(06/07/2018): analysing the use of map for search Search in map using equals values like comparator might not work because usally the mapping between values and identity (key/hash) is not one-to-one(see figure bellow). In this way is need to create an sorted list for search values which results in O(n) (worst-case).
更新(06/07/2018):使用类似于比较器之类的值来分析地图在地图上的使用,可能不会起作用,因为值和标识(键/散列)之间的映射不是一对一的(见图bellow)。这样就需要为搜索值创建一个排序列表,结果是O(n)(最坏情况)。
with Item.getValuesHashCode()
:
与Item.getValuesHashCode():
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
In this case, you must keep Vector<ItemState>
instead of Item
and to use the key like the result of getValuesHashCode
. Change the mechanism of state-control for keep first reference of the Item and the state current. See bellow:
在这种情况下,必须保留Vector
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
To search you must use concurrentMapRef.get(key). The result will the reference of updated ItemState.
要搜索,必须使用concurrentMapRef.get(key)。结果将引用更新的ItemState。
Results in my tests for :
我的测试结果如下:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
More details in code: https://github.com/ag-studies/*-queue
代码中的更多细节:https://github.com/ag-studies/*-queue
UPDATED IN 06/09/2018: redesign
更新06/09/2018:重新设计
Generalizing this project, I can undestand that the state machine is something like:
概括这个项目,我可以毫不夸张地说,状态机是这样的:
In this way I decoupled the workers of the queues for improve concepts. I used an MemoryRep for keep the unique reference for item in overall processment. Of course that you can use strategies event-based if you need keep ItemState in a physic repository.
通过这种方式,我解耦了队列的工作人员以改进概念。我使用了一个MemoryRep来在整个过程中保存项目的惟一引用。当然,如果需要在物理存储库中保存ItemState,可以使用基于事件的策略。
This keep the previous idea and creates more legibility for the concepts. See this:
这保持了之前的想法,并为概念创建了更清晰的概念。看到这个:
I understand that each job will have two queue (input/output) and relationship with a business model! The researcher will always find the most updated and consistent state of Item.
我理解每个作业将有两个队列(输入/输出)和一个业务模型的关系!研究者总是能找到最更新、最一致的项目状态。
So, answering your ask:
所以,回答你的问:
-
I can find the consistent state of Item anywhere using MemoryRep (basically an Map), wrapping state and item in ItemState, and controlling the change state on job on enqueue or dequeue it.
我可以使用MemoryRep(基本上是映射)、在ItemState中包装状态和项目,以及控制作业上的更改状态,从而在任何地方找到项目的一致状态。
-
The performace is keeped, except on running of next()
除了运行next()之外,性能保持不变
-
The state is allways consistent (for your problem)
国家是始终如一的(对于你的问题)
-
In this model is possible use any queue type, any number of jobs/queues, and any number of state.
在这个模型中,可以使用任何队列类型、任意数量的作业/队列和任意数量的状态。
-
Additionaly this is beautiful!!
另外这是美丽的! !
#3
1
As previously answered, Wrap the queues or the item would be viable solutions or both.
如前所述,包装队列或项目都是可行的解决方案,或者两者都是。
public class ItemWrapper<E> {
E item;
Status status;
public ItemWrapper(Item i, Status s){ ... }
public setStatus(Status s){ ... }
// not necessary if you use a queue wrapper (see queue wrapper)
public boolean equals(Object obj) {
if ( obj instanceof ItemWrapper)
return item.equals(((ItemWrapper) obj).item)
return false;
}
public int hashCode(){
return item;
}
}
...
process(item) // process update status in the item
...
Probably a better way, already answered, is to have a QueueWrapper who update the queue status. For the fun I don't use a status map but I use the previously itemwrapper it seems cleaner (a status map works too).
可能更好的方法是,有一个队列包装器来更新队列状态。为了好玩,我不使用状态映射,但是我使用前面的itemwrapper它看起来更简洁(一个状态映射也可以)。
public class QueueWrapper<E> implements Queue<E> {
private Queue<ItemWrapper<E>> myQueue;
static private Status inStatus; // FIRST
static private Status outStatus; // AFTER_FIRST
public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
@Override
public boolean add(E e) {
return myQueue.add(new ItemWrapper(e, inStatus));
}
@Override
public E remove(){
ItemWrapper<E> result = myQueue.remove();
result.setStatus(outStatus)
return result.item;
}
...
}
You can also use AOP to inject status update in your queues without changing your queues (a status map should be more appropriate than itemwrapper).
您还可以使用AOP在队列中注入状态更新,而无需更改队列(状态映射应该比itemwrapper更合适)。
Maybe I didn't answer well your question because an easy way to know where is your item could be to check in each queue with "contains" function.
也许我没有很好地回答你的问题,因为要知道你的项目在哪里,一个简单的方法是使用“contains”函数检入每个队列。
#4
1
Here's something different from what others have said. Taking from the world of queue services and systems we have the concept of message acknowledgement. This is nice, because it also gives you some built in retry logic.
这与别人所说的有所不同。从队列服务和系统的世界中,我们有消息确认的概念。这很好,因为它还提供了一些重试逻辑的构建。
I'll lay out how it would work from a high level, and if you need I can add code.
我将从更高的层次阐述它是如何工作的,如果需要,我可以添加代码。
Essentially you'll have a Set
to go with each of your queues. You'll wrap your queues in an object so that when you dequeue an item a few things happen
本质上,您将拥有一个与每个队列匹配的集合。您将把队列包装在一个对象中,以便当您对一个项目进行下队列时,会发生一些事情
- The item is removed from the queue
- 项从队列中删除
- The item is added to the associated set
- 该项被添加到相关集。
- A task (lambda containing an atomic boolean (default false)) is scheduled. When run it will remove item from the set and if the boolean is false, put it back in the queue
- 计划一个任务(包含一个原子布尔值(默认为false)的lambda)。运行时,它将从集合中删除项,如果布尔值为false,则将其放回队列中
- The item and a wrapper around the boolean are returned to the caller
- 在布尔值周围的项和包装器返回给调用者。
Once process(i);
completes, your code will indicate receipt acknowledgement to the wrapper, and the wrapper will remove the item from the set and make the boolean false.
一旦过程(我);完成后,您的代码将向包装器表明接收确认,包装器将从集合中删除该项并使布尔值为false。
A method to return status would simply check which queue or set the item is in.
返回状态的方法只需检查哪个队列或设置条目。
Note that this gives "at least once" delivery, meaning an item will be processed at least once, but potentially more than once if the processing time is too close to the timeout.
注意,这提供了“至少一次”交付,这意味着一个项目将至少被处理一次,但如果处理时间太接近超时,则可能被处理多次。