3.5 线程
在高性能组件、框架和服务程序开发中,多线程并发处理的技术被广泛采用。合理的采用多线程,可以大大提高系统的处理能力和处理速度,提高硬件的利用率。多线程的控制能力,是组件开发者必备的基本技能。3.5.1 线程基础
我们先来看看如何创建一个线程。创建一个线程有2种方法,一种是从Thread继承出一个子类,另一种是实现Runnable 接口。具体如下: 1) 从Thread 继承:只需要覆盖Thread类的run方法,提供自己的线程处理逻辑代码即可,示例如下: public class MyThread extends Thread{ public void run() { // 线程处理逻辑代码写在这里 // ...... } } 启动线程,开始运行的代码如下: MyThread thread = new MyThread(); thread.start(); 2) 实现Runnable接口:Runnable 接口只有一个run方法,需要被实现,代码示例如下: public class MyThread2 implements Runnable{ public void run() { // 线程处理逻辑代码写在这里 // ...... } } 启动线程,开始运行的代码如下: Thread thread = new Thread(new MyThread2()); thread.start(); 由于Java只支持单继承,一旦类继承自Thread,则再也不能从其它类继承了。但接口没有此限制。因此,在实际应用中,如何自己的线程类一定要从其它的类继承,则必须选择实现Runnable接口的方式来创建线程。 当start 方法被调用后,则我们自己写的run 方法就会被执行,run 方法执行完后,线程就退出,结束了自己的生命周期。 通常情况下,我们都需要线程保持较长的生命周期而不结束,有任务需要处理时交给线程处理,没有任务时线程处于睡眠状态,让出CPU时间片。保持线程不退出的唯一办法就是run 方法不结束。因此,run的实现中,通常是个while 循环,保证线程不退出。 当程序结束的时候,我们需要让所有的线程结束运行,这时我们需要有相应的办法来让run 方法结束。 综合这两种场景,常见的解决方法是设置一个boolean 类型的变量,while 循环用这个变量作为循环条件。准备结束线程时,把这个变量设置为false,while 循环就结束了,线程就退出了。代码示例如下: public class WorkThread implements Runnable { private boolean canRun = true; // 线程运行控制变量 private int interval = 200; // 线程睡眠间隔 public WorkThread() { } // 结束线程 public void stopThread() { canRun = false; } // 线程处理方法 public void run() { while (canRun) { //... if (hasTask()) { // 有任务要处理 handleTask(); // 处理任务 } else { // 无任务时,线程睡眠 try { Thread.sleep(interval); } catch (Exception e) {} } } // 退出前要释放相关的资源 // ...... } private boolean hasTask() { // 请加入自己的逻辑判断代码 return false; } private void handleTask() { // 请加入自己的逻辑处理代码 } } 注意,Thread类提供了suspend()、resume()、stop()、destroy()方法,但这些方法都是Deprecated的,因此大家不要使用。 实际上,调用WorkThread.stopThread() 方法后,线程并没有马上退出,而是要等到handleTask() 处理完或Thread.sleep(interval) 返回后,进入下一次循环,while 才能判断canRun 变量变为false,从而退出while循环,然后执行while循环后面的语句,直至run() 方法的最后一个语句。 从以上分析过程可以看出,从调用WorkThread.stopThread()开始,到线程真正结束,中间的时间是不确定的。在一些情况下,我们需要准备获得线程是在什么时候结束的,从而保证资源被彻底释放,并且在线程结束后才可以执行一些特定的业务动作。这种情况下,我们可以使用Thread.join() 方法,示例如下: public class ThreadDemo { public static void main(String[] args) { WorkThread worker = new WorkThread(); Thread thread = new Thread(worker); thread.start(); // 其它的代码 // ...... // 线程结束 worker.stopThread(); // 等待线程结束 try { thread.join(3000); } catch (Exception e) { e.printStackTrace(); } } }3.5.2 多线程同步
由于同一进程的多个线程共享同一存储空间,这意味着这个进程内的线程都可以对进程内的同一数据进行读写,多线程并发对同一数据进行读写操作,会导致数据的不同步。下面举个简单的例子来说明: class Counter { private int iCounter = 0; public void add() { iCounter++; System.out.println(iCounter); } }class CounterThread extends Thread { private Counter counter; private int times; private boolean canRun = true; public CounterThread(Counter counter, int times) { this.counter = counter; this.times = times; } public void stopThread() { canRun = false; } public void run() { int i = 0; while (canRun) { counter.add(); Thread.yield(); if (++i >= times) { break; } } } } public class ThreadConflictDemo { public static void main(String[] args) { Counter c = new Counter(); CounterThread t1 = new CounterThread(c, 100); CounterThread t2 = new CounterThread(c, 100); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (Exception e) { e.printStackTrace(); } }
} 代码比较简单,ThreadConflictDemo的main函数中,先创建了一个Counter对象,代表内存中的一个共享数据,然后创建了2个线程t1和t2,这两个线程都会调用Counter对象的add方法100次,将iCounter 递增1,然后打印输出。因此,我们期望,控制台会输出共200行,每行数值和行号相同,如下: 1 2 3 …… 199 200 但实际上,由于多线程对同一数据进行读写,没有任何同步保护机制,最终输出的结果未必会同我们期望的那样。下面解释一下: t1 和 t2 两个线程,被CPU调度,因此,谁先执行,谁后执行是不确定的,不同的CPU有不同的调度策略。在一个指定的时间点,到底那个线程被加载到CPU中执行,也是不确定的。因此,我们假设某一次运行过程,是按下面的顺序来执行的: t1 t2 public void add() { public void add() { iCounter++; (1) iCounter++; (2) println(iCounter); (3) println(iCounter); (4) } } iCounter 初始值为0. 按照时间的先后顺序,依次执行为: (1) t1 线程执行了iCounter++;语句,执行完毕后,iCounter 值为1 (2) t2线程执行了iCounter++;语句,执行完毕后,iCounter 值为2 (3) t1 线程执行了打印语句,此时iCounter值为2,因此输出2 (4) t2 线程执行了打印语句,此时iCounter值为2,因此又输出2 这样,输出了2行,值都是2,而值1没有输出,没有输出预期的结果。 因此,多线程对同一数据进行读写操作时,必须采用相应的技术手段避免上述的情况发生。这种技术手段叫同步,Java中用synchronized 关键字实现同步,它包括两种用法:synchronized 方法和 synchronized 块。 1. synchronized 方法:通过在方法声明中加入 synchronized关键字来声明 synchronized 方法,如: public synchronized void addValue(int newVal); synchronized 方法控制对类成员变量的访问:每个类实例对应一把锁,每个 synchronized 方法都必须获得调用该方法的类实例的锁方能执行,否则所属线程阻塞,方法一旦执行,就独占该锁,直到从该方法返回时才将锁释放,此后被阻塞的线程方能获得该锁,重新进入可执行状态。这种机制确保了同一时刻对于每一个类实例,其所有声明为 synchronized 的成员函数中至多只有一个处于可执行状态(因为至多只有一个能够获得该类实例对应的锁),从而有效避免了类成员变量的访问冲突(只要所有可能访问类成员变量的方法均被声明为 synchronized)。 在 Java 中,不光是类实例,每一个类也对应一把锁,这样我们也可将类的静态成员函数声明为 synchronized ,以控制其对类的静态成员变量的访问。 synchronized 方法的缺陷:若将一个大的方法声明为synchronized 将会大大影响效率,因为一个方法内会有很多逻辑处理代码,并不是所有的处理过程都需要同步。我们可以选择,只同步需要同步的代码片段,这就是另外一种同步方法: synchronized 块。 2. synchronized 块:通过 synchronized关键字来声明synchronized 块。语法如下: synchronized(syncObject) { //允许访问控制的代码 } synchronized 块是这样一个代码块,其中的代码必须获得对象 syncObject (如前所述,可以是类实例或类)的锁方能执行,具体机制同前所述。由于可以针对任意代码块,且可任意指定上锁的对象,故灵活性较高。
3.5.3 线程的阻塞
为了解决对共享存储区的访问冲突,Java 引入了同步机制,现在让我们来考察多个线程对共享资源的访问,显然同步机制已经不够了,因为在任意时刻所要求的资源不一定已经准备好了被访问,反过来,同一时刻准备好了的资源也可能不止一个。为了解决这种情况下的访问控制问题,Java 引入了对阻塞机制的支持。 阻塞指的是暂停一个线程的执行以等待某个条件发生(如某资源就绪),学过操作系统的同学对它一定已经很熟悉了。Java 提供了大量方法来支持阻塞,下面让我们逐一分析。 1. sleep() 方法:sleep() 允许 指定以毫秒为单位的一段时间作为参数,它使得线程在指定的时间内进入阻塞状态,不能得到CPU 时间,指定的时间一过,线程重新进入可执行状态。 典型地,sleep() 被用在等待某个资源就绪的情形:测试发现条件不满足后,让线程阻塞一段时间后重新测试,直到条件满足为止。 2. yield() 方法:yield() 使得线程放弃当前分得的 CPU 时间,但是不使线程阻塞,即线程仍处于可执行状态,随时可能再次分得 CPU 时间。调用 yield() 的效果等价于调度程序认为该线程已执行了足够的时间从而转到另一个线程。 3. wait() 和 notify() 方法:两个方法配套使用,wait() 使得线程进入阻塞状态,它有两种形式,一种允许 指定以毫秒为单位的一段时间作为参数,另一种没有参数,前者当对应的 notify() 被调用或者超出指定时间时线程重新进入可执行状态,后者则必须对应的 notify() 被调用。 上述的核心区别导致了一系列的细节上的区别。 首先,前面叙述的所有方法都隶属于 Thread 类,但是这一对却直接隶属于 Object 类,也就是说,所有对象都拥有这一对方法。初看起来这十分不可思议,但是实际上却是很自然的,因为这一对方法阻塞时要释放占用的锁,而锁是任何对象都具有的,调用任意对象的 wait() 方法导致线程阻塞,并且该对象上的锁被释放。而调用 任意对象的notify()方法则导致因调用该对象的 wait() 方法而阻塞的线程中随机选择的一个解除阻塞(但要等到获得锁后才真正可执行)。 其次,前面叙述的所有方法都可在任何位置调用,但是这一对方法却必须在 synchronized 方法或块中调用,理由也很简单,只有在synchronized 方法或块中当前线程才占有锁,才有锁可以释放。同样的道理,调用这一对方法的对象上的锁必须为当前线程所拥有,这样才有锁可以释放。因此,这一对方法调用必须放置在这样的 synchronized 方法或块中,该方法或块的上锁对象就是调用这一对方法的对象。若不满足这一条件,则程序虽然仍能编译,但在运行时会出现IllegalMonitorStateException 异常。 wait() 和 notify() 方法的上述特性决定了它们经常和synchronized 方法或块一起使用,将它们和操作系统的进程间通信机制作一个比较就会发现它们的相似性:synchronized方法或块提供了类似于操作系统原语的功能,它们的执行不会受到多线程机制的干扰,而这一对方法则相当于 block 和wakeup 原语(这一对方法均声明为 synchronized)。它们的结合使得我们可以实现操作系统上一系列精妙的进程间通信的算法(如信号量算法),并用于解决各种复杂的线程间通信问题。 关于 wait() 和 notify() 方法最后再说明两点: 第一:调用 notify() 方法导致解除阻塞的线程是从因调用该对象的 wait() 方法而阻塞的线程中随机选取的,我们无法预料哪一个线程将会被选择,所以编程时要特别小心,避免因这种不确定性而产生问题。 第二:除了 notify(),还有一个方法 notifyAll() 也可起到类似作用,唯一的区别在于,调用 notifyAll() 方法将把因调用该对象的 wait() 方法而阻塞的所有线程一次性全部解除阻塞。当然,只有获得锁的那一个线程才能进入可执行状态。 谈到阻塞,就不能不谈一谈死锁,不指定超时期限的 wait() 方法的调用都可能产生死锁。遗憾的是,Java 并不在语言级别上支持死锁的避免,我们在编程中必须小心地避免死锁。3.5.4 守护线程
守护线程是一类特殊的线程,它和普通线程的区别在于它并不是应用程序的核心部分,当一个应用程序的所有非守护线程终止运行时,即使仍然有守护线程在运行,应用程序也将终止,反之,只要有一个非守护线程在运行,应用程序就不会终止。守护线程一般被用于在后台为其它线程提供服务。 可以通过调用方法 isDaemon() 来判断一个线程是否是守护线程,也可以调用方法 setDaemon() 来将一个线程设为守护线程。3.5.5 多线程范例
最后,综合上面讲解的多线程知识,给出一个多线程典型应用的例子。 具体场景如下: 一个线程做为生产者,生产产品,另一个线程作为消费者,消费产品。但生产者和消费者不能有直接的访问,这样可以保持松耦合。另外,如果生产速度太快,消费速度太慢,则产品会严重积压。因此,生产者和消费者之间要进行一定生产速度和消费速度的平衡。 具体实现思路如下: 生产者是一个线程,消费者是另外一个线程,这两个线程之间通过一个队列来传送产品。生产者线程生产产品,然后把产品放到队列中,消费者线程从队列中依次取出产品,进行消费。为了避免产品积压,设定队列中最大积压的产品数量,生产者线程会查询队列中的产品数量,如果已经达到了设定的积压数量,则暂停生产,等待消费者消费掉产品。 先看一下队列类的实现,核心的线程同步代码都在这里面: import java.util.*;public class Queue { // 队列的最大长度 private int queueMaxLength = Integer.MAX_VALUE; // 用LinkedList存放队列元素 private LinkedList list = new LinkedList(); public Queue() { } public Queue(int queueMaxLength) { this.queueMaxLength = queueMaxLength; } public int getQueueMaxLength() { return queueMaxLength; }
public void setQueueMaxLength(int queueMaxLength) { this.queueMaxLength = queueMaxLength; } // 获得队列中当前的元素数量 public int getQueueSize() { return list.size(); } // 把对象放入队列尾部,返回true为成功,返回false表示队列已满无法放入 public synchronized boolean put(Object obj) { if (list.size() < queueMaxLength) { list.add(obj); notifyAll(); // 通知其它线程 return true; } return false; } // 把对象放入队列头部,返回true为成功,返回false表示队列已满无法放入 public synchronized boolean putHead(Object obj) { if (list.size() < queueMaxLength) { list.addFirst(obj); notifyAll(); // 通知其它线程 return true; } return false; } // 获取队列的头部元素,并从队列中移除。如果队列为空,则会一直阻塞 public synchronized Object poll() { waitWhenNoElement(-1); return list.poll(); } // 获取队列的头部元素,并从队列中移除。如果队列为空,则会阻塞,直至超时 // timeout : 超时时间,单位毫秒。如果为0,表示不阻塞等待,直接返回。 public synchronized Object poll(int timeout) { waitWhenNoElement(timeout); return list.poll(); } // 获取队列的头部元素,不从队列中移除。如果队列为空,则会一直阻塞 public synchronized Object peek() { waitWhenNoElement(-1); return list.peek(); }
// 获取队列的头部元素,不从队列中移除。如果队列为空,则会阻塞,直至超时 // timeout : 超时时间,单位毫秒。如果为0,表示不阻塞等待,直接返回。 public synchronized Object peek(int timeout) { waitWhenNoElement(timeout); return list.peek(); } // 当队列为空时等待 private void waitWhenNoElement(int timeout) { if (list.size() < 1) { // 队列为空 try { if (timeout < 0) // 小于0,表示无限等下去 wait(); else if (timeout > 0) // 大于0时,只等待指定的时间. timeout=0时不等待 wait(timeout); } catch (Exception e) {} } } } 另外为了简化线程的编写,又写了一个线程基类BaseThread,如下: public class BaseThread extends Thread{ private boolean bRun = true; // 线程是否可以继续运行 protected boolean canRun() { return bRun; } // 停止线程 public void stopThread() { bRun = false; } // 睡眠 public static void sleep(int millis) { try { Thread.sleep(millis); } catch (Exception e) {} } } 生产者线程如下: public class Producer extends BaseThread{ private Queue queue; private int productNo = 0; // 产品编号 public Producer(Queue queue) { this.queue = queue; } public void run() { while (canRun()) { // 先判断队列是否已满 if (queue.getQueueSize() >= queue.getQueueMaxLength()) { System.out.println("Queue is full, pause..."); sleep(500); // 队列满的时候,暂停生产 } else { // 队列未满的时候,可以生产 Integer product = newProduct(); // 生产一个商品 System.out.println("Produce product, id = " + product); queue.put(product); // 放入队列 Thread.yield(); // 让出时间片 } } } // 生产产品 private Integer newProduct() { // 用Integer对象来模拟一个产品 Integer product = new Integer(++productNo); sleep(100); // 模拟生产所需要消耗的时间 return product; } } 消费者线程如下: public class Consumer extends BaseThread { private Queue queue; public Consumer(Queue queue) { this.queue = queue; } public void run() { while (canRun()) { Object obj = queue.poll(500); if (obj != null) { // comsumeProduct(obj); Thread.yield(); } } } // 消费掉产品 private void comsumeProduct(Object obj) { Integer product = (Integer)obj; System.out.println("Comsume product, id = " + product); sleep(500); // 模拟消费产品所需要的时间 } } 演示线程同步的测试类,如下: public class SynchronizeDemo { public static void main(String[] args) { Queue queue = new Queue(20); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); producer.start(); consumer.start(); BaseThread.sleep(5000); producer.stopThread(); consumer.stopThread(); try { producer.join(20000); consumer.join(20000); } catch (Exception e) {} } } 运行后,控制台输出结果如下: Produce product, id = 1 Comsume product, id = 1 Produce product, id = 2 Produce product, id = 3 Produce product, id = 4 Produce product, id = 5 Comsume product, id = 2 Produce product, id = 6 Produce product, id = 7 Produce product, id = 8 Produce product, id = 9 Produce product, id = 10 Comsume product, id = 3 Produce product, id = 11 Produce product, id = 12 Produce product, id = 13 Produce product, id = 14 Comsume product, id = 4 Produce product, id = 15 Produce product, id = 16 Produce product, id = 17 Produce product, id = 18 Produce product, id = 19 Comsume product, id = 5 Produce product, id = 20 Produce product, id = 21 Produce product, id = 22 Produce product, id = 23 Comsume product, id = 6 Produce product, id = 24 Produce product, id = 25 Produce product, id = 26 Queue is full, pause... Comsume product, id = 7 Produce product, id = 27 Queue is full, pause... Comsume product, id = 8 Produce product, id = 28 Queue is full, pause... Comsume product, id = 9 Comsume product, id = 10 Produce product, id = 29 Produce product, id = 30 Queue is full, pause... 由于是多线程并发,每次运行的输出结果可能不尽相同。
本文出自 “expert” 博客,转载请与作者联系!