我该如何处理Java中的多线程?

时间:2022-02-25 17:37:47

I am working on a practical scenario related with Java;a socket program. The existing system and the expected system are as follows.

我正在研究与Java相关的实际场景;套接字程序。现有系统和预期系统如下。

Existing System - The system checks that a certain condition is satisfied. If so It will create some message to be sent and put it into a queue.

现有系统 - 系统检查是否满足某个条件。如果是这样它将创建一些要发送的消息并将其放入队列中。

The queue processor is a separate thread. It periodically check the queue for existence of items in it. If found any items (messages) it just sends the message to a remote host (hardcoded) and remove the item from queue.

队列处理器是一个单独的线程。它会定期检查队列中是否存在项目。如果找到任何项目(消息),它只是将消息发送到远程主机(硬编码)并从队列中删除该项目。

Expected System - This is something like that. The message is created when a certain condition is satisfied but in every case the recipient is not same. So there are many approaches.

预期的系统 - 就是这样的。在满足特定条件时创建消息,但在每种情况下接收者都不相同。所以有很多方法。

  1. putting the message into the same queue but with its receiver ID. In this case the 2nd thread can identify the receiver so the message can be sent to that.

    将消息放入同一队列但使用其接收者ID。在这种情况下,第二个线程可以识别接收器,以便可以将消息发送到该接收器。

  2. Having multiple threads. In this case when the condition is satisfied and if the receiver in "New" it creates a new queue and put the message into that queue. And a new thread initializes to process that queue. If the next messages are directed to same recipient it should put to the same queue and if not a new queue and the thread should be created.

    有多个线程。在这种情况下,当满足条件并且接收器处于“新”状态时,它会创建一个新队列并将消息放入该队列。并且新线程初始化以处理该队列。如果下一个消息被定向到同一个接收者,它应该放在同一个队列中,如果不是新队列,则应该创建该线程。

Now I want to implement the 2nd one, bit stucked. How should I do that? A skeleton would be sufficient and you won't need to worry to put how to create queues etc... :)

现在我想实现第二个,有点卡住了。我该怎么做?骨架就足够了,您不必担心如何创建队列等... :)

Update : I also think that the approach 1 is the best way to do that. I read some articles on threading and came to that decision. But it is really worth to learn how to implement the approach 2 as well.

更新:我也认为方法1是最好的方法。我读了一些关于线程的文章并做出了这个决定。但是,了解如何实施方法2也是非常值得的。

5 个解决方案

#1


First of all, if you are planning to have a lot of receivers, I would not use the ONE-THREAD-AND-QUEUE-PER-RECEIVER approach. You could end up with a lot of threads not doing anything most of the time and I could hurt you performance wide. An alternative is using a thread pool of worker threads, just picking tasks from a shared queue, each task with its own receiver ID, and perhaps, a shared dictionary with socket connections to each receiver for the working threads to use.

首先,如果你计划有很多接收器,我不会使用ONE-THREAD-AND-QUEUE-PER-RECEIVER方法。你可能最终会遇到大量没有做任何事情的线程,我可能会伤害你的性能。另一种方法是使用工作线程的线程池,只从共享队列中挑选任务,每个任务都有自己的接收器ID,也许还有一个共享字典,其中包含与每个接收器的套接字连接,供工作线程使用。

Having said so, if you still want to pursue your approach what you could do is:

话虽如此,如果你仍然想要追求你的方法,你可以做的是:

1) Create a new class to handle your new thread execution:

1)创建一个新类来处理新的线程执行:

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) On your main thread, create the messages and use a dictionary (hash table) to see if the receiver's threads is already created. If is is, the just queue the new message. If not, create a new thread, put it in the hashtable and queue the new message:

2)在主线程上,创建消息并使用字典(哈希表)来查看接收者的线程是否已经创建。如果是,则只是将新消息排队。如果没有,请创建一个新线程,将其放入哈希表并对新消息进行排队:

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

Good luck.

Edit: you can improve this solution by using BlockingQueue Brian mentioned with this approach.

编辑:您可以通过使用此方法提到的BlockingQueue Brian来改进此解决方案。

#2


Consider using Java Message Services (JMS) rather than re-inventing the wheel?

考虑使用Java消息服务(JMS)而不是重新发明*?

#3


Can I suggest that you look at BlockingQueue ? Your dispatch process can write to this queue (put), and clients can take or peek in a threadsafe manner. So you don't need to write the queue implementation at all.

我可以建议你看看BlockingQueue吗?您的调度进程可以写入此队列(put),客户端可以采用线程安全方式。所以你根本不需要编写队列实现。

If you have one queue containing different message types, then you will need to implement some peek-type mechanism for each client (i.e. they will have to check the head of the queue and only take what is theirs). To work effectively then consumers will have to extract data required for them in a timely and robust fashion.

如果你有一个包含不同消息类型的队列,那么你需要为每个客户端实现一些peek-type机制(即他们必须检查队列的头部并且只采用他们的那些)。为了有效地工作,消费者必须以及时和健壮的方式提取他们所需的数据。

If you have one queue/thread per message/consumer type, then that's going to be easier/more reliable.

如果每个消息/消费者类型有一个队列/线程,那么这将更容易/更可靠。

Your client implementation will simply have to loop on:

您的客户端实现只需要循环:

while (!done) {
   Object item = queue.take();
   // process item
}

Note that the queue can make use of generics, and take() is blocking.

请注意,队列可以使用泛型,并且take()是阻塞的。

Of course, with multiple consumers taking messages of different types, you may want to consider a space-based architecture. This won't have queue (FIFO) characteristics, but will allow you multiple consumers in a very easy fashion.

当然,由于多个消费者采用不同类型的消息,您可能需要考虑基于空间的体系结构。这不具有队列(FIFO)特性,但可以以非常简单的方式为您提供多个消费者。

#4


You have to weigh up slightly whether you have lots of end machines and occasional messages to each, or a few end machines and frequent messages to each.

无论你是否拥有大量的终端机器和偶尔发送的消息,或者有几台终端机器和频繁的消息,你都需要稍微权衡一下。

If you have lots of end machines, then literally having one thread per end machine sounds a bit over the top unless you're really going to be constantly streaming messages to all of those machines. I would suggest having a pool of threads which will only grow between certain bounds. To do this, you could use a ThreadPoolExecutor. When you need to post a message, you actually submit a runnable to the executor which will send the message:

如果你有很多终端机器,那么每台终端机器上只有一个线程听起来有点过头,除非你真的要不断地向所有这些机器传输消息。我建议有一个线程池,只会在某些边界之间增长。为此,您可以使用ThreadPoolExecutor。当您需要发布消息时,实际上是向执行者提交了一个runnable,它将发送消息:

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

If you just have a few end machines, then you could have a BlockingQueue per machine, and a thread per blocking queue sitting waiting for the next message. In this case, the pattern is more like this (beware untested off-top-of-head Sunday morning code):

如果您只有几台终端机器,那么每台机器可能有一个BlockingQueue,并且每个阻塞队列的线程都在等待下一条消息。在这种情况下,模式更像是这样(谨防未经测试的头顶星期日早上代码):

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

In this case, we close the connection if no message for that machine arrives within 60 seconds.

在这种情况下,如果该机器的消息在60秒内没有到达,我们将关闭连接。

Should you use JMS instead? Well, you have to weigh up whether this sounds complicated to you. My personal feeling is it isn't a complicated enough a task to warrant a special framework. But I'm sure opinions differ.

你应该使用JMS吗?好吧,你必须权衡这对你来说是否复杂。我个人的感觉是,保证一个特殊的框架并不是一项复杂的任务。但我确信意见不同。

P.S. In reality, now I look at this, you'd probably put the queue inside the thread object and just map machine ID -> thread object. Anyway, you get the idea.

附:实际上,现在我看一下这个,你可能把队列放在线程对象中,只是映射机器ID - >线程对象。无论如何,你明白了。

#5


You might try using SomnifugiJMS, an in-vm JMS implementation using java.util.concurrent as the actual "engine" of sorts.

您可以尝试使用SomnifugiJMS,这是一个使用java.util.concurrent作为各种实际“引擎”的in-vm JMS实现。

It will probably be somewhat overkill for your purposes, but may well enable your application to be distributed for little to no additional programming (if applicable), you just plug in a different JMS implementation like ActiveMQ and you're done.

它可能对你的目的来说有些过分,但很可能使你的应用程序几乎没有其他编程(如果适用)分发,只需插入一个不同的JMS实现,如ActiveMQ,你就完成了。

#1


First of all, if you are planning to have a lot of receivers, I would not use the ONE-THREAD-AND-QUEUE-PER-RECEIVER approach. You could end up with a lot of threads not doing anything most of the time and I could hurt you performance wide. An alternative is using a thread pool of worker threads, just picking tasks from a shared queue, each task with its own receiver ID, and perhaps, a shared dictionary with socket connections to each receiver for the working threads to use.

首先,如果你计划有很多接收器,我不会使用ONE-THREAD-AND-QUEUE-PER-RECEIVER方法。你可能最终会遇到大量没有做任何事情的线程,我可能会伤害你的性能。另一种方法是使用工作线程的线程池,只从共享队列中挑选任务,每个任务都有自己的接收器ID,也许还有一个共享字典,其中包含与每个接收器的套接字连接,供工作线程使用。

Having said so, if you still want to pursue your approach what you could do is:

话虽如此,如果你仍然想要追求你的方法,你可以做的是:

1) Create a new class to handle your new thread execution:

1)创建一个新类来处理新的线程执行:

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) On your main thread, create the messages and use a dictionary (hash table) to see if the receiver's threads is already created. If is is, the just queue the new message. If not, create a new thread, put it in the hashtable and queue the new message:

2)在主线程上,创建消息并使用字典(哈希表)来查看接收者的线程是否已经创建。如果是,则只是将新消息排队。如果没有,请创建一个新线程,将其放入哈希表并对新消息进行排队:

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

Good luck.

Edit: you can improve this solution by using BlockingQueue Brian mentioned with this approach.

编辑:您可以通过使用此方法提到的BlockingQueue Brian来改进此解决方案。

#2


Consider using Java Message Services (JMS) rather than re-inventing the wheel?

考虑使用Java消息服务(JMS)而不是重新发明*?

#3


Can I suggest that you look at BlockingQueue ? Your dispatch process can write to this queue (put), and clients can take or peek in a threadsafe manner. So you don't need to write the queue implementation at all.

我可以建议你看看BlockingQueue吗?您的调度进程可以写入此队列(put),客户端可以采用线程安全方式。所以你根本不需要编写队列实现。

If you have one queue containing different message types, then you will need to implement some peek-type mechanism for each client (i.e. they will have to check the head of the queue and only take what is theirs). To work effectively then consumers will have to extract data required for them in a timely and robust fashion.

如果你有一个包含不同消息类型的队列,那么你需要为每个客户端实现一些peek-type机制(即他们必须检查队列的头部并且只采用他们的那些)。为了有效地工作,消费者必须以及时和健壮的方式提取他们所需的数据。

If you have one queue/thread per message/consumer type, then that's going to be easier/more reliable.

如果每个消息/消费者类型有一个队列/线程,那么这将更容易/更可靠。

Your client implementation will simply have to loop on:

您的客户端实现只需要循环:

while (!done) {
   Object item = queue.take();
   // process item
}

Note that the queue can make use of generics, and take() is blocking.

请注意,队列可以使用泛型,并且take()是阻塞的。

Of course, with multiple consumers taking messages of different types, you may want to consider a space-based architecture. This won't have queue (FIFO) characteristics, but will allow you multiple consumers in a very easy fashion.

当然,由于多个消费者采用不同类型的消息,您可能需要考虑基于空间的体系结构。这不具有队列(FIFO)特性,但可以以非常简单的方式为您提供多个消费者。

#4


You have to weigh up slightly whether you have lots of end machines and occasional messages to each, or a few end machines and frequent messages to each.

无论你是否拥有大量的终端机器和偶尔发送的消息,或者有几台终端机器和频繁的消息,你都需要稍微权衡一下。

If you have lots of end machines, then literally having one thread per end machine sounds a bit over the top unless you're really going to be constantly streaming messages to all of those machines. I would suggest having a pool of threads which will only grow between certain bounds. To do this, you could use a ThreadPoolExecutor. When you need to post a message, you actually submit a runnable to the executor which will send the message:

如果你有很多终端机器,那么每台终端机器上只有一个线程听起来有点过头,除非你真的要不断地向所有这些机器传输消息。我建议有一个线程池,只会在某些边界之间增长。为此,您可以使用ThreadPoolExecutor。当您需要发布消息时,实际上是向执行者提交了一个runnable,它将发送消息:

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

If you just have a few end machines, then you could have a BlockingQueue per machine, and a thread per blocking queue sitting waiting for the next message. In this case, the pattern is more like this (beware untested off-top-of-head Sunday morning code):

如果您只有几台终端机器,那么每台机器可能有一个BlockingQueue,并且每个阻塞队列的线程都在等待下一条消息。在这种情况下,模式更像是这样(谨防未经测试的头顶星期日早上代码):

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

In this case, we close the connection if no message for that machine arrives within 60 seconds.

在这种情况下,如果该机器的消息在60秒内没有到达,我们将关闭连接。

Should you use JMS instead? Well, you have to weigh up whether this sounds complicated to you. My personal feeling is it isn't a complicated enough a task to warrant a special framework. But I'm sure opinions differ.

你应该使用JMS吗?好吧,你必须权衡这对你来说是否复杂。我个人的感觉是,保证一个特殊的框架并不是一项复杂的任务。但我确信意见不同。

P.S. In reality, now I look at this, you'd probably put the queue inside the thread object and just map machine ID -> thread object. Anyway, you get the idea.

附:实际上,现在我看一下这个,你可能把队列放在线程对象中,只是映射机器ID - >线程对象。无论如何,你明白了。

#5


You might try using SomnifugiJMS, an in-vm JMS implementation using java.util.concurrent as the actual "engine" of sorts.

您可以尝试使用SomnifugiJMS,这是一个使用java.util.concurrent作为各种实际“引擎”的in-vm JMS实现。

It will probably be somewhat overkill for your purposes, but may well enable your application to be distributed for little to no additional programming (if applicable), you just plug in a different JMS implementation like ActiveMQ and you're done.

它可能对你的目的来说有些过分,但很可能使你的应用程序几乎没有其他编程(如果适用)分发,只需插入一个不同的JMS实现,如ActiveMQ,你就完成了。