使用Java在线程之间管道数据

时间:2022-05-11 17:31:46

I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).

我正在编写一个模仿电影院的多线程应用程序。涉及的每个人都是自己的线程,并发必须完全由信号量完成。我唯一的问题是如何基本上链接线程,以便他们可以通信(例如通过管道)。

For instance:

Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.

客户[1]是一个线程,获取一个信号量,让它走向票房。现在,客户[1]必须告诉Box Office Agent他们想要看电影“X”。然后BoxOfficeAgent [1]也是一个线程,必须检查以确保电影未满,并要么卖票或告诉客户[1]选择另一部电影。

How do I pass that data back and forth while still maintaining concurrency with the semaphores?

如何在保持与信号量的并发性的同时来回传递数据?

Also, the only class I can use from java.util.concurrent is the Semaphore class.

另外,我可以在java.util.concurrent中使用的唯一类是Semaphore类。

2 个解决方案

#1


8  

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

在线程之间来回传递数据的一种简单方法是使用位于java.util.concurrent包中的接口BlockingQueue 的实现。

This interfaces has methods to add elements to the collection with different behaviors:

此接口具有使用不同行为向集合添加元素的方法:

  • add(E): adds if possible, otherwise throws exception
  • add(E):尽可能添加,否则抛出异常

  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E):如果已添加元素,则返回true,否则返回false

  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • boolean offer(E,long,TimeUnit):尝试添加元素,等待指定的时间

  • put(E): blocks the calling thread until the element has been added
  • put(E):阻塞调用线程,直到添加元素

It also defines methods for element retrieval with similar behaviors:

它还定义了具有类似行为的元素检索方法:

  • take(): blocks until there's an element available
  • take():阻塞,直到有一个元素可用

  • poll(long, TimeUnit): retrieves an element or returns null
  • poll(long,TimeUnit):检索元素或返回null

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

我最常使用的实现是:ArrayBlockingQueue,LinkedBlockingQueue和SynchronousQueue。

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

第一个是ArrayBlockingQueue,它有一个固定的大小,由传递给它的构造函数的参数定义。

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

第二个是LinkedBlockingQueue,它的大小很小。它将始终接受任何元素,即,offer将立即返回true,add将永远不会抛出异常。

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

第三个,也是最有趣的一个,SynchronousQueue,正是一个管道。您可以将其视为大小为0的队列。它永远不会保留一个元素:如果某个其他线程试图从中检索元素,则此队列只接受元素。相反,如果有另一个线程试图推送它,则检索操作将仅返回一个元素。

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

为了满足专门用信号量完成同步的作业要求,你可以从我给你的关于SynchronousQueue的描述中获得灵感,并写一些非常相似的东西:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

请注意,此类提供与我所描述的有关SynchronousQueue的类似行为。

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

一旦调用了方法(E),它就会获取写信号量,它将保留为空,这样对同一方法的另一次调用就会阻塞它的第一行。然后,此方法存储对传递的对象的引用,并释放读取的信号量。此版本将使调用take()方法的任何线程都可以继续。

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

然后,take()方法的第一步自然是获取读取信号量,以便禁止任何其他线程同时检索该元素。在检索到元素并将其保存在局部变量之后(练习:如果该行E e = this.e被删除会发生什么?),该方法释放写信号量,以便方法put(E)可以被任何线程再次调用,并返回已保存在局部变量中的内容。

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

作为一个重要的评论,请注意对传递的对象的引用保存在私有字段中,方法take()和put(E)都是最终的。这是至关重要的,而且常常被遗漏。如果这些方法不是最终的(或更糟糕的是,字段不是私有的),继承类将能够改变take()和put(E)违反合同的行为。

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

最后,您可以通过使用try {} finally {}来避免在take()方法中声明局部变量,如下所示:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

在这里,这个例子的目的只是为了展示在没有经验的开发人员中没有注意到的try / finally的使用。显然,在这种情况下,没有真正的收获。

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

哦,该死的,我大部分都为你完成了你的功课。在报复中 - 并且为了测试您对信号量的了解 - 为什么不实现BlockingQueue合约定义的其他一些方法呢?例如,您可以实现offer(E)方法和take(E,long,TimeUnit)!

Good luck.

#2


1  

Think it in terms of shared memory with read/write lock.

根据具有读/写锁定的共享内存来考虑它。

  1. Create a buffer to put the message.
  2. 创建一个缓冲区来放置消息。

  3. The access to the buffer should be controlled by using a lock/semaphore.
  4. 应使用锁/信号量来控制对缓冲区的访问。

  5. Use this buffer for inter thread communication purpose.
  6. 使用此缓冲区进行线程间通信。

Regards

PKV

#1


8  

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

在线程之间来回传递数据的一种简单方法是使用位于java.util.concurrent包中的接口BlockingQueue 的实现。

This interfaces has methods to add elements to the collection with different behaviors:

此接口具有使用不同行为向集合添加元素的方法:

  • add(E): adds if possible, otherwise throws exception
  • add(E):尽可能添加,否则抛出异常

  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E):如果已添加元素,则返回true,否则返回false

  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • boolean offer(E,long,TimeUnit):尝试添加元素,等待指定的时间

  • put(E): blocks the calling thread until the element has been added
  • put(E):阻塞调用线程,直到添加元素

It also defines methods for element retrieval with similar behaviors:

它还定义了具有类似行为的元素检索方法:

  • take(): blocks until there's an element available
  • take():阻塞,直到有一个元素可用

  • poll(long, TimeUnit): retrieves an element or returns null
  • poll(long,TimeUnit):检索元素或返回null

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

我最常使用的实现是:ArrayBlockingQueue,LinkedBlockingQueue和SynchronousQueue。

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

第一个是ArrayBlockingQueue,它有一个固定的大小,由传递给它的构造函数的参数定义。

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

第二个是LinkedBlockingQueue,它的大小很小。它将始终接受任何元素,即,offer将立即返回true,add将永远不会抛出异常。

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

第三个,也是最有趣的一个,SynchronousQueue,正是一个管道。您可以将其视为大小为0的队列。它永远不会保留一个元素:如果某个其他线程试图从中检索元素,则此队列只接受元素。相反,如果有另一个线程试图推送它,则检索操作将仅返回一个元素。

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

为了满足专门用信号量完成同步的作业要求,你可以从我给你的关于SynchronousQueue的描述中获得灵感,并写一些非常相似的东西:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

请注意,此类提供与我所描述的有关SynchronousQueue的类似行为。

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

一旦调用了方法(E),它就会获取写信号量,它将保留为空,这样对同一方法的另一次调用就会阻塞它的第一行。然后,此方法存储对传递的对象的引用,并释放读取的信号量。此版本将使调用take()方法的任何线程都可以继续。

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

然后,take()方法的第一步自然是获取读取信号量,以便禁止任何其他线程同时检索该元素。在检索到元素并将其保存在局部变量之后(练习:如果该行E e = this.e被删除会发生什么?),该方法释放写信号量,以便方法put(E)可以被任何线程再次调用,并返回已保存在局部变量中的内容。

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

作为一个重要的评论,请注意对传递的对象的引用保存在私有字段中,方法take()和put(E)都是最终的。这是至关重要的,而且常常被遗漏。如果这些方法不是最终的(或更糟糕的是,字段不是私有的),继承类将能够改变take()和put(E)违反合同的行为。

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

最后,您可以通过使用try {} finally {}来避免在take()方法中声明局部变量,如下所示:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

在这里,这个例子的目的只是为了展示在没有经验的开发人员中没有注意到的try / finally的使用。显然,在这种情况下,没有真正的收获。

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

哦,该死的,我大部分都为你完成了你的功课。在报复中 - 并且为了测试您对信号量的了解 - 为什么不实现BlockingQueue合约定义的其他一些方法呢?例如,您可以实现offer(E)方法和take(E,long,TimeUnit)!

Good luck.

#2


1  

Think it in terms of shared memory with read/write lock.

根据具有读/写锁定的共享内存来考虑它。

  1. Create a buffer to put the message.
  2. 创建一个缓冲区来放置消息。

  3. The access to the buffer should be controlled by using a lock/semaphore.
  4. 应使用锁/信号量来控制对缓冲区的访问。

  5. Use this buffer for inter thread communication purpose.
  6. 使用此缓冲区进行线程间通信。

Regards

PKV