多线程--同步互斥&生产者消费者(一)

时间:2021-11-01 20:20:48

一、线程同步&互斥

在实际应用中,多个线程往往会共享一些数据,并且各个线程之间的状态和行为是相互影响的。线程之间的影响有两种,一种是线程间的互斥,另一种是线程间的同步。
---线程安全(Thread-Safe)
---线程间的互斥
---线程间的同步
线程之间的关系大致可以分为两类
1、线程之间通过对资源的竞争,包括共享的数据和硬件资源,所产生的相互制约关系,这类线程间的主要问题是互斥和死锁问题,这类关系被称为互斥关系。
2、线程之间的相互协同合作,彼此之间直接知道对方的存在,并了解对方的名字,这类进程常常需要通过“进程间通信”方法来协同工作,这类关系被称为同步关系。
--共享资源是指在程序中并发运行的 若干个线程所操作的同一数据资源。
--并发运行的若干线程在操作共享资源时可能会出错,通常把这种现象称为非线程安全。反之,则称为线程安全
--线程间的互斥
出现共享资源访问冲突的实质是线程没有互斥的使用共享资源,即单个线程需要独占性的使用共享资源。
共享资源也称为“临界资源”,而将线程中访问共享变量的代码段称为临界段。为了使系统中并行线程正确而有效的访问资源,对线程互斥使用临界有以下原则:
1、在共享同一个临界资源的所有线程中,每次只允许有一个线程处于它的临界段之中。也就是说强制所有这些线程中,每次只允许其中的一个线程访问该共享变量。
2、线程只应在临界段内逗留有限时间。
3、若有多个线程同时要求进入它们的临界段时,就在有限的时间内让其中之一进入临界段,而不应相互阻塞,以至于各线程均无法进入临界段。
为了能够对临界段实现互斥,计算机科学家提出使用“同步愿语”来访问“信号量”的方式来解决。
信号量是指用于协调多个相互关联线程操作的信号,当一个线程需要访问临界段时,将被强制地停在临界段的访问入口处,直到收到一个专门的信号,才能进入临界段。
同步原语,通常用来保证临界段代码执行的原子性,即不可中断性。
Java中关键字 “synchronized”用于实现语句的同步操作,即给共享资源加互斥锁


锁定一个对象和一段代码
声明格式为:
synchronized(<对象名 >){

 <语句组>

}

锁定一个方法
声明格式为:
synchronized<方法声明 >{
 
 <方法体>

}

1、线程间的互斥

无论是对方法加互斥锁,还是对对象加互斥锁,其实质都是实现对共享资源的互斥访问
互斥操作是以降低应用程序的并发程度为代价的
因此,在编写多线程程序中,对synchronized的使用就遵循以下两个原则:
    -不需要再多个线程中使用共享资源时,那么就没有必要使用该关键字;
    -如果某个方法只是返回对象的值,而不去修改对象的值时,那么也就没有必要使用该关键字。


2、线程间的同步:

Java中Object类提供了一组关于线程同步的方法
public final void wait(long timeout)
这个方法将使得执行该方法的线程对象被阻塞

注意Thread类的sleep方法和Object类的wait方法,均可以用于将线程的状态由运行状态转为不可运行状态,
但二者在等待时间上是有区别的,sleep()方法的等待时间是确定的,到时由系统唤醒,而wait方法的等待时间是不确定的,需要由线程通过notify()/notifyAll()方法来唤醒。

public final void notify()
唤醒被wait方法阻塞的单个线程
public final void notifyAll()
唤醒全被被wait方法阻塞的线程

 

小结:
1、线程的优先级:

(1)线程的优先级表示线程的重要程度
(2)在默认条件下,一个线程将继承其父线程的优先级
(3)线程优先级的使用原则与操作系统有着密切的联系因此在JAVA中的线程的调度是完全受其所运行平台的操作系统的线程调度程序控制的
(4)可以使用getPriority()和setPriority()方法来获取与设定线程的优先级。

2、线程组
(1)线程组提供了一个统一管理多个线程而不需要单独管理的机制
(2)在JAVA语言中用java.lang,ThreadGroup类来支持线程组的实现
(3)当多个线程分为一组时,可以用一个方法启动或挂起线程组中的所有线程

3、守护线程
(1)守护线程,也称后台线程,是指在程序运行的时候在后台提供一种通用服务的线程
(2)守护线程不会阻止程序的终止
(3)使用setDaemon方法设定一个线程为守护线程。
(4)要创建一个守护线程必须在调用它的start()方法之前设置

二、线程同步的五种方法

    1、同步方法 
    即有synchronized关键字修饰的方法。 由于java的每个对象都有一个内置锁,当用此关键字修饰方法时,  内置锁会保护整个方法。在调用该方法前,需要获得内置锁,否则就处于阻塞状态。
    代码如: 

    public synchronized void save(){}
   注: synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类

    2、同步代码块 
    即有synchronized关键字修饰的语句块。 被该关键字修饰的语句块会自动被加上内置锁,从而实现同步
    代码如: 

synchronized(object){ 
}
 
    注:同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。  
    代码实例: 

package com.showhair.thread;

/**
* 线程同步的运用
*
* @author XIEHEJUN
*
*/
public class SynchronizedThread {

class Bank {

private int account = 100;

public int getAccount() {
return account;
}

/**
* 用同步方法实现
*
* @param money
*/
public synchronized void save(int money) {
account += money;
}

/**
* 用同步代码块实现
*
* @param money
*/
public void save1(int money) {
synchronized (this) {
account += money;
}
}
}

class NewThread implements Runnable {
private Bank bank;

public NewThread(Bank bank) {
this.bank = bank;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
// bank.save1(10);
bank.save(10);
System.out.println(i + "账户余额为:" + bank.getAccount());
}
}

}

/**
* 建立线程,调用内部类
*/
public void useThread() {
Bank bank = new Bank();
NewThread new_thread = new NewThread(bank);
System.out.println("线程1");
Thread thread1 = new Thread(new_thread);
thread1.start();
System.out.println("线程2");
Thread thread2 = new Thread(new_thread);
thread2.start();
}

public static void main(String[] args) {
SynchronizedThread st = new SynchronizedThread();
st.useThread();
}

}
    

    3、volatile(特殊域变量)
    (1)volatile关键字为域变量的访问提供了一种免锁机制, 
    (2)使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新, 
    (3)因此每次使用该域就要重新计算,而不是使用寄存器中的值 
    (4)volatile不会提供任何原子操作,它也不能用来修饰final类型的变量 
    
    例如: 
        在上面的例子当中,只需在account前面加上volatile修饰,即可实现线程同步。 
    
    代码实例: 

 //只给出要修改的代码,其余代码与上同
class Bank {
//需要同步的变量加上volatile
private volatile int account = 100;

public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
account += money;
}
    4、 ReentrantLock(重用锁)

    ReentrantLock类是可重入、互斥、实现了Lock接口的锁, 它与使用synchronized方法和快具有相同的基本行为和语义,并且扩展了其能力
    ReenreantLock类的常用方法有:
    (1)ReentrantLock() : 创建一个ReentrantLock实例 
    (2)lock() : 获得锁

    (3)unlock() : 释放锁 
    注:ReentrantLock()还有一个可以创建公平锁的构造方法,但由于能大幅度降低程序运行效率,不推荐使用 
    例如: 
        在上面例子的基础上,改写后的代码为: 

    
    代码实例: 

//只给出要修改的代码,其余代码与上同
class Bank {

private int account = 100;
//需要声明这个锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
lock.lock();
try{
account += money;
}finally{
lock.unlock();
}

}
       
    注:关于Lock对象和synchronized关键字的选择: 
        (1)最好两个都不用,使用一种java.util.concurrent包提供的机制, 
            能够帮助用户处理所有与锁相关的代码。 
        b.如果synchronized关键字能满足用户的需求,就用synchronized,因为它能简化代码 
        c.如果需要更高级的功能,就用ReentrantLock类,此时要注意及时释放锁,否则会出现死锁,通常在finally代码释放锁 
       
    注:多线程中的非同步问题主要出现在对域的读写上,如果让域自身避免这个问题,则就不需要修改操作该域的方法。 
    用final域,有锁保护的域和volatile域可以避免非同步的问题。
   

    5、使用局部变量实现线程同步 
    如果使用ThreadLocal管理变量,则每一个使用该变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。
    ThreadLocal 类的常用方法
    (1)ThreadLocal() : 创建一个线程本地变量 
    (2)get() : 返回此线程局部变量的当前线程副本中的值 
    (3)initialValue() : 返回此线程局部变量的当前线程的"初始值" 
    (4)set(T value) : 将此线程局部变量的当前线程副本中的值设置为value

    例如: 
        在上面例子基础上,修改后的代码为:    

//只改Bank类,其余代码与上同
public class Bank{
//使用ThreadLocal类管理共享变量account
private static ThreadLocal<Integer> account = new ThreadLocal<Integer>(){
@Override
protected Integer initialValue(){
return 100;
}
};
public void save(int money){
account.set(account.get()+money);
}
public int getAccount(){
return account.get();
}
}
    注:ThreadLocal与同步机制 
        (1)ThreadLocal与同步机制都是为了解决多线程中相同变量的访问冲突问题。 
        (2)前者采用以"空间换时间"的方法,后者采用以"时间换空间"的方式

三、生产者消费者问题

    先来一段down来的概念:

    1、生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

2、解决方法 要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。 分析一下:1、产生数据的线程 2、缓冲区3、消耗数据的线程4、线程间通信(达到同步&互斥) 3、实例分析

多个生产者和多个消费者就是多条执行线程,他们共同操作一个数据结构中的数据,数据结构中有时是没有数据的,这个时候消费者应该处于等待状态而不是不断的去访问这个数据结构。这里就涉及到线程间通信(当然此处还涉及到互斥,这里暂不考虑这一点),消费者线程一次消费后发现数据结构空了,就应该处于等待状态,生产者生产数据后,就去唤醒消费者线程开始消费。生产者线程某次生产后发现数据结构已经满了,也应该处于等待状态,消费者消费一条数据后,就去唤醒生产者继续生产。

实现这种线程间同步,可以通过Object类提供的wait,notify, notifyAll 3个方法去进行即可。一个简单的生产者和消费者的例子代码为:

[java] view plain copy
  1. package cn.test;  
  2.   
  3. public class ProducerConsumer {  
  4.       
  5.     public static void main(String[] args) {  
  6.           
  7.         final MessageQueue mq = new MessageQueue(10);  
  8.         // 创建3个生产者  
  9.         for(int p=0;p<3;p++){  
  10.               
  11.             new Thread(new Runnable(){  
  12.   
  13.                 @Override  
  14.                 public void run() {  
  15.                       
  16.                     while(true){  
  17.                           
  18.                         mq.put("消息来了!");  
  19.                         // 生产消息后,休息100毫秒  
  20.                         try {  
  21.                             Thread.currentThread().sleep(100);  
  22.                         } catch (InterruptedException e) {  
  23.                             e.printStackTrace();  
  24.                         }  
  25.                     }  
  26.                 }  
  27.                   
  28.                   
  29.             }, "Producer" + p).start();  
  30.         }  
  31.           
  32.         // 创建3个消费者  
  33.         for(int s=0;s<3;s++){  
  34.               
  35.             new Thread(new Runnable(){  
  36.   
  37.                 @Override  
  38.                 public void run() {  
  39.                       
  40.                     while(true){  
  41.                           
  42.                         mq.get();  
  43.                         // 消费消息后,休息100毫秒  
  44.                         try {  
  45.                             Thread.currentThread().sleep(100);  
  46.                         } catch (InterruptedException e) {  
  47.                             e.printStackTrace();  
  48.                         }  
  49.                     }  
  50.                 }  
  51.                   
  52.                   
  53.             }, "Consumer" + s).start();  
  54.         }  
  55.     }  
  56.       
  57.     /** 
  58.      * 内部类模拟一个消息队列,生产者和消费者就去操作这个消息队列 
  59.      */  
  60.     private static class MessageQueue{  
  61.           
  62.         private String[] messages;// 放置消息的数据结构  
  63.         private int opIndex; // 将要操作的位置索引  
  64.   
  65.         public MessageQueue(int size) {  
  66.               
  67.             if(size <= 0){  
  68.                   
  69.                 throw new IllegalArgumentException("消息队列的长度至少为1!");  
  70.             }  
  71.             messages = new String[size];  
  72.             opIndex = 0;  
  73.         }  
  74.           
  75.         public synchronized void put(String message){  
  76.               
  77.             // Java中存在线程假醒的情况,此处用while而不是用if!可以参考Java规范!  
  78.             while(opIndex == messages.length){  
  79.                   
  80.                 // 消息队列已满,生产者需要等待  
  81.                 try {  
  82.                     wait();  
  83.                 } catch (InterruptedException e) {  
  84.                     e.printStackTrace();  
  85.                 }  
  86.             }  
  87.             messages[opIndex] = message;  
  88.             opIndex++;  
  89.             System.out.println("生产者 " + Thread.currentThread().getName() + " 生产了一条消息: " + message);  
  90.             // 生产后,对消费者进行唤醒  
  91.             notifyAll();  
  92.         }  
  93.           
  94.         public synchronized String get(){  
  95.               
  96.             // Java中存在线程假醒的情况,此处用while而不是用if!可以参考Java规范!  
  97.             while(opIndex == 0){  
  98.                   
  99.                 // 消息队列无消息,消费者需要等待  
  100.                 try {  
  101.                     wait();  
  102.                 } catch (InterruptedException e) {  
  103.                     e.printStackTrace();  
  104.                 }  
  105.             }  
  106.             String message = messages[opIndex-1];  
  107.             opIndex--;  
  108.             System.out.println("消费者 " + Thread.currentThread().getName() + " 消费了一条消息: " + message);  
  109.             // 消费后,对生产者进行唤醒  
  110.             notifyAll();  
  111.             return message;  
  112.         }  
  113.           
  114.     }  
  115.   
  116. }  


一次输出为:

[java] view plain copy
  1. 消费者 Consumer1 消费了一条消息: 消息来了!  
  2. 生产者 Producer0 生产了一条消息: 消息来了!  
  3. 消费者 Consumer0 消费了一条消息: 消息来了!  
  4. 生产者 Producer2 生产了一条消息: 消息来了!  
  5. 消费者 Consumer2 消费了一条消息: 消息来了!  
  6. 生产者 Producer1 生产了一条消息: 消息来了!  
  7. 消费者 Consumer0 消费了一条消息: 消息来了!  
  8. 生产者 Producer0 生产了一条消息: 消息来了!  
  9. 消费者 Consumer1 消费了一条消息: 消息来了!  
  10. 生产者 Producer2 生产了一条消息: 消息来了!  
  11. 消费者 Consumer2 消费了一条消息: 消息来了!  
  12. 生产者 Producer0 生产了一条消息: 消息来了!  
  13. 消费者 Consumer1 消费了一条消息: 消息来了!  
  14. 生产者 Producer1 生产了一条消息: 消息来了!  
  15. 消费者 Consumer0 消费了一条消息: 消息来了!  
  16. 生产者 Producer2 生产了一条消息: 消息来了!  
  17. 消费者 Consumer0 消费了一条消息: 消息来了!  
  18. 生产者 Producer1 生产了一条消息: 消息来了!  
  19. 生产者 Producer0 生产了一条消息: 消息来了!  
  20. 消费者 Consumer2 消费了一条消息: 消息来了!  
  21. 消费者 Consumer1 消费了一条消息: 消息来了!  
  22. 生产者 Producer1 生产了一条消息: 消息来了!