在之前,已经学习到了线程的创建和状态控制,但是每个线程之间几乎都没有什么太大的联系。可是有的时候,可能存在多个线程多同一个数据进行操作,这样,可能就会引用各种奇怪的问题。现在就来学习多线程对数据访问的控制吧。
由于同一进程的多个线程共享同一片存储空间,在带来方便的同时,也带来了访问冲突这个严重的问题。Java语言提供了专门机制以解决这种冲突,有效避免了同一个数据对象被多个线程同时访问。
一、多线程引起的数据访问安全问题
下面看一个经典的问题,银行取钱的问题:
1)、你有一张银行卡,里面有5000块钱,然后你到取款机取款,取出3000,当正在取的时候,取款机已经查询到你有5000块钱,然后正准备减去300块钱的时候
2)、你的老婆拿着那张银行卡对应的存折到银行取钱,也要取3000.然后银行的系统查询,存折账户里还有6000(因为上面钱还没扣),所以它也准备减去3000,
3)、你的卡里面减去3000,5000-3000=2000,并且你老婆的存折也是5000-3000=2000。
4)、结果,你们一共取了6000,但是卡里还剩下2000。
下面看程序的模拟过程:
- package com.tao.test;
- public class GetMoneyTest {
- public static void main(String[] args) {
- Account account = new Account(5000);
- GetMoneyRun runnable = new GetMoneyRun(account);
- new Thread(runnable, "你").start();
- new Thread(runnable, "你老婆").start();
- }
- }
- // 账户Mode
- class Account {
- private int money;
- public Account(int money) {
- super();
- this.money = money;
- }
- public int getMoney() {
- return money;
- }
- public void setMoney(int money) {
- this.money = money;
- }
- }
- //runnable类
- class GetMoneyRun implements Runnable {
- private Account account;
- public GetMoneyRun(Account account) {
- this.account = account;
- }
- @Override
- public void run() {
- if (account.getMoney() > 3000) {
- System.out.println(Thread.currentThread().getName() + "的账户有"
- + account.getMoney() + "元");
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- int lasetMoney=account.getMoney() - 3000;
- account.setMoney(lasetMoney);
- System.out.println(Thread.currentThread().getName() + "取出来了3000元"
- + Thread.currentThread().getName() + "的账户还有"
- + account.getMoney() + "元");
- } else {
- System.out.println("余额不足3000" + Thread.currentThread().getName()
- + "的账户只有" + account.getMoney() + "元");
- }
- }
- }
多次运行程序,可以看到有多种不同的结果,下面是其中的三种:
- 你的账户有5000元
- 你老婆的账户有5000元
- 你老婆取出来了3000元你老婆的账户还有2000元
- 你取出来了3000元你的账户还有-1000元
- 你的账户有5000元
- 你老婆的账户有5000元
- 你老婆取出来了3000元你老婆的账户还有-1000元
- 你取出来了3000元你的账户还有-1000元
- 你的账户有5000元
- 你老婆的账户有5000元
- 你老婆取出来了3000元你老婆的账户还有2000元
- 你取出来了3000元你的账户还有2000元
可以看到,由于有两个线程同时访问这个account对象,导致取钱发生的账户发生问题。当多个线程访问同一个数据的时候,非常容易引发问题。为了避免这样的事情发生,我们要保证线程同步互斥,所谓同步互斥就是:并发执行的多个线程在某一时间内只允许一个线程在执行以访问共享数据。
二、同步互斥锁
同步锁的原理:Java中每个对象都有一个内置同步锁。Java中可以使用synchronized关键字来取得一个对象的同步锁。synchronized的使用方式,是在一段代码块中,加上synchronized(object){ ... }
例如,有一个show方法,里面有synchronized的代码段:
- public void show() {
- synchronized(object){
- ......
- }
- }
这其中的object可以使任何对象,表示当前线程取得该对象的锁。一个对象只有一个锁,所以其他任何线程都不能访问该对象的所有由synchronized包括的代码段,直到该线程释放掉这个对象的同步锁(释放锁是指持锁线程退出了synchronized同步方法或代码块)。
注意:synchronized使用方式有几个要注意的地方(还是以上面的show方法举例):
①、取得同步锁的对象为this,即当前类对象,这是使用的最多的一种方式
- public void show() {
- synchronized(this){
- ......
- }
- }
②、将synchronized加到方法上,这叫做同步方法,相当于第一种方式的缩写
- public synchronized void show() {
- }
③、静态方法的同步
- public static synchronized void show() {
- }
相当于
- public static void show() {
- synchronized(当前类名.class)
- }
相当于取得类对象的同步锁,注意它和取得一个对象的同步锁不一样
- @Override
- public void run() {
- synchronized (account) {
- if (account.getMoney() > 3000) {
- System.out.println(Thread.currentThread().getName() + "的账户有"
- + account.getMoney() + "元");
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- int lasetMoney = account.getMoney() - 3000;
- account.setMoney(lasetMoney);
- System.out.println(Thread.currentThread().getName()
- + "取出来了3000元" + Thread.currentThread().getName()
- + "的账户还有" + account.getMoney() + "元");
- } else {
- System.out.println("余额不足3000"
- + Thread.currentThread().getName() + "的账户只有"
- + account.getMoney() + "元");
- }
- }
- }
当甲线程执行run方法的时候,它使用synchronized (account)取得了account对象的同步锁,那么只要它没释放掉这个锁,那么当乙线程执行到run方法的时候,它就不能获得继续执行的锁,所以只能等甲线程执行完,然后释放掉锁,乙线程才能继续执行。
synchronized关键字使用要注意以下几点:
1)、只能同步方法和代码块,而不能同步变量和类。只要保护好类中数据的安全访问和设置就可以了,不需要对类使用synchronized关键字,所以Java不允许这么做。并且想要同步数据,只需要对成员变量私有化,然后同步方法即可,不需要对成员变量使用synchronized,java也禁止这么做。
2)、每个对象只有一个同步锁;当提到同步时,应该清楚在什么上同步?也就是说,在哪个对象上同步?上面的代码中run方法使用synchronized (account)代码块,因为两个线程访问的都是同一个Account对象,所以能够锁定。但是如果是其他的一个无关的对象,就没用了。比如说synchronized (new Date())代码块,一样没有效果。
3)、不必同步类中所有的方法,类可以同时拥有同步和非同步方法。
5)、如果线程拥有同步和非同步方法,则非同步方法可以被多个线程*访问而不受锁的限制。
6)、线程睡眠时,它所持的任何同步锁都不会释放。
8)、同步损害并发性,应该尽可能缩小同步范围。同步不但可以同步整个方法,还可以同步方法中一部分代码块。
9)、编写线程安全的代码会使系统的总体效率会降低,要适量使用
一个线程取得了同步锁,那么在什么时候才会释放掉呢?
1、同步方法或代码块正常结束
2、使用return或 break终止了执行,或者跑出了未处理的异常。
3、当线程执行同步方法或代码块时,程序执行了同步锁对象的wait()方法。
三、死锁
死锁:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不能正常运行。简单的说就是:线程死锁时,第一个线程等待第二个线程释放资源,而同时第二个线程又在等待第一个线程释放资源。这里举一个通俗的例子:如在人行道上两个人迎面相遇,为了给对方让道,两人同时向一侧迈出一步,双方无法通过,又同时向另一侧迈出一步,这样还是无法通过。假设这种情况一直持续下去,这样就会发生死锁现象。
导致死锁的根源在于不适当地运用“synchronized”关键词来管理线程对特定对象的访问。“synchronized”关键词的作用是,确保在某个时刻只有一个线程被允许执行特定的代码块,因此,被允许执行的线程首先必须拥有对变量或对象的排他性访问权。当线程访问对象时,线程会给对象加锁,而这个锁导致其它也想访问同一对象的线程被阻塞,直至第一个线程释放它加在对象上的锁。
一个死锁的造成很简单,比如有两个对象A 和 B 。第一个线程锁住了A,然后休眠1秒,轮到第二个线程执行,第二个线程锁住了B,然后也休眠1秒,然后有轮到第一个线程执行。第一个线程又企图锁住B,可是B已经被第二个线程锁定了,所以第一个线程进入阻塞状态,又切换到第二个线程执行。第二个线程又企图锁住A,可是A已经被第一个线程锁定了,所以第二个线程也进入阻塞状态。就这样,死锁造成了。
举个例子:
- package com.tao.test;
- public class DeadLock2 {
- public static void main(String[] args) {
- Object object1=new Object();
- Object object2=new Object();
- new Thread(new T(object1,object2)).start();
- new Thread(new T(object2,object1)).start();
- }
- }
- class T implements Runnable{
- private Object object1;
- private Object object2;
- public T(Object object1,Object object2) {
- this.object1=object1;
- this.object2=object2;
- }
- public void run() {
- synchronized (object1) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- synchronized (object2) {
- System.out.println("无法执行到这一步");
- }
- }
- };
- }
上面的就是个死锁。
第一个线程首先锁住了object1,然后休眠。接着第二个线程锁住了object2,然后休眠。在第一个线程企图在锁住object2,进入阻塞。然后第二个线程企图在锁住object1,进入阻塞。死锁了。
四、线程的协调运行
关于线程的协调运行,经典的例子就是生产者和消费者的问题。比如有生产者不断的生产馒头,放入一个篮子里,而消费者不断的从篮子里拿馒头吃。并且,当篮子满的时候,生产者通知消费者来吃馒头,并且自己等待不在生产馒头。当篮子没满的的时候,由消费者通知生产者生产馒头。这样不断的循环。
要完成上面的功能,光靠我们前面的同步等知识,是不能完成的。而是要用到线程间的协调运行。*父类Object中有3种方法来控制线程的协调运行。
notify、notifyAll、wait。其中wait有3个重载的方法。
这三个方法必须由同步监视器对象(即线程获得的锁对象)来调用,这可分为两种情况:
1、对于使用synchronized修饰的同步代码块,因为当前的类对象(this)就是同步监视器,所以可以再同步方法中直接调用这三个方法。
2、对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号的对象,所以必须使用该对象调用这三个方法。
wait(): 导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。wait()方法有三种形式:无时间参数的wait(一直等待,直到其他线程通知),带毫秒参数的wait和带毫秒、微秒参数的wait(这两种方法都是等待指定时间后自动苏醒)。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify(): 唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择幻想其中一个线程。选择是任意性。只有当前线程放弃对该同步监视器的锁定后(使用wait()方法),才可以执行被唤醒的其他线程。
notifyAll():唤醒在此同步监视器上等待的所有线程。只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。
因为使用wait、notify和notifyAll三个方法一定是在同步代码块中使用的,所以一定要明白下面几点:
1、如果两个线程是因为都要得到同一个对象的锁,而导致其中一个线程进入阻塞状态。那么只有等获得锁的线程执行完毕,或者它执行了该锁对象的wait方法,阻塞的线程才会有机会得到锁,继续执行同步代码块。
2、使用wait方法进入等待状态的线程,会释放掉锁。并且只有其他线程调用notify或者notifyAll方法,才会被唤醒。要明白,线程因为锁阻塞和等待是不同的,因为锁进入阻塞状态,会在其他线程释放锁的时候,得到锁在执行。而等待状态必须要靠别人唤醒,并且唤醒了也不一定会立刻执行,有可能因为notifyAll方法使得很多线程被唤醒,多个线程等待同一个锁,而进入阻塞状态。还可能是调用notify的线程依然没有释放掉锁,只有等他执行完了,其他线程才能去争夺这个锁。
看下面的例子:
- package com.tao.test;
- public class ThreadA {
- public static void main(String[] args) {
- RunnableTest myRunnanle=new RunnableTest();
- new Thread(myRunnanle).start();
- synchronized (myRunnanle) {
- try {
- System.out.println("第一步");
- myRunnanle.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("第四步");
- }
- }
- }
- class RunnableTest implements Runnable {
- public void run() {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- synchronized (this) {
- System.out.println("第二步");
- notify();
- System.out.println("第三步");
- }
- }
- }
有两个线程,主线程和我们自己新建的子线程。一步步的分析程序的执行:
1、因为子线程启动后,调用了sleep,所以主线程先进入同步代码块,而子线程之后因为没有锁,会进入阻塞状态。
2、主线程的同步代码块执行,打印第一句话,然后调用wait方法,进入等待状态。因为进入了等待状态,所以释放掉了锁,所以子线程可以获得锁,开始执行。
3、子线程执行,打印第二句话,然后调用notify方法,将主线程唤醒。可是子线程并没有结束,依然持有锁,所以主线程不得不进入阻塞状态,等待这个锁。
4、子线程打印第三句话,然后线程正常运行结束,释放掉锁。然后主线程得到了锁,从阻塞进入运行状态,打印第四句话。
5、完毕
在看一个关于上面提到的生产者和消费者的例子:
首先,是生产物品的Mode,这里以馒头举例:
- // 馒头的实例
- class ManTou {
- private int id;// 馒头的id
- public ManTou(int id) {
- this.id = id;
- }
- public String toString(){
- return "ManTou"+id;
- }
- }
共享对象,生产者生产的馒头放入其中,消费者从里面拿出馒头,这里以篮子举例:
- // 篮子的实例,用来放馒头
- class BasketBall {
- private int index = 0;// 表示装到第几个了馒头
- private ManTou[] manTous = new ManTou[6];// 可以放6个馒头
- // 放进去一个馒头
- public synchronized void push(ManTou manTou) {
- while(index==manTous.length){
- try {
- System.out.println("篮子满了!");
- this.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println(Thread.currentThread().getName()+"生产"+manTou.toString());
- this.notify();
- manTous[index] = manTou;
- index++;
- }
- // 拿一个馒头
- public synchronized ManTou pop() {
- while (index==0) {
- try {
- System.out.println("篮子空了!");
- this.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- ManTou manTou=manTous[--index];
- System.out.println(Thread.currentThread().getName()+"吃了"+manTou.toString());
- this.notify();
- return manTou;
- }
- }
生产者:
- // 生产者,生产馒头
- class Producer implements Runnable {
- private BasketBall basketBall;
- public Producer(BasketBall basketBall) {
- this.basketBall = basketBall;
- }
- @Override
- public void run() {
- for (int i = 0; i < 20; i++) {
- ManTou manTou = new ManTou(i);// 生产馒头
- basketBall.push(manTou);
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- // 消费者,拿馒头吃
- class Consumer implements Runnable {
- private BasketBall basketBall;
- public Consumer(BasketBall basketBall) {
- this.basketBall = basketBall;
- }
- @Override
- public void run() {
- for (int i = 0; i < 20; i++) {
- ManTou manTou=basketBall.pop();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
测试:
- public class ProducerConsumer {
- public static void main(String[] args) {
- BasketBall basketBall=new BasketBall();
- new Thread(new Producer(basketBall)).start();
- new Thread(new Consumer(basketBall)).start();
- }
- }
这个可以类比于C++ ,同步对象是mutex,condition_variable是和mutex绑定的,当调用condition_variable.wait()的时候,会先释放掉绑定的mutex,被唤醒后,重新lock
两者的区别在于,C++的condition_variable是需要显示定义,并需要和mutex绑定。也就是说candition_variable是两个对象。
而在java中是一个对象。
另外,java也有lock/condition机制,见 http://lavasoft.blog.51cto.com/62575/222084
贴一个C++ blockingQueueu的实现,体会下
#include <condition_variable>
#include <mutex>
#include <deque>
#include "aggregator/common/const_define.h" AGGREGATOR_BEGIN_NAMESPACE template<typename T>
class BlockingQueue {
public:
void Put(const T& item) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push_back(std::move(item));
_not_empty_cond.notify_one();
} T Get() {
std::unique_lock<std::mutex> lock(_mutex);
_not_empty_cond.wait(lock, [this] { return !_queue.empty(); });
T front(std::move(_queue.front()));
_queue.pop_front();
return front;
} size_t Size() const
{
std::lock_guard<std::mutex> lock (_mutex);
return _queue.size();
}
private:
mutable std::mutex _mutex;
std::deque<T> _queue;
std::condition_variable _not_empty_cond;
};