JAVA多线程学习(一)

时间:2021-10-12 17:29:17

//Wait and notify notifyAll

public class WaitTest {


static Object o = new Object();

/**
* @param args
*/
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
for(int i=0;i<10;i++){
synchronized(o){
try {
System.out.println(Thread.currentThread().getName()+"Wait");
o.wait();
System.out.println(Thread.currentThread().getName()+"Wait end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

});
t1.start();

new Thread(new Runnable(){

@Override
public void run() {
for(int i=0;i<10;i++){

try {
synchronized(o){
o.notifyAll();
System.out.println(Thread.currentThread().getName()+"notify");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}


}
}

}).start();

}

}

wait()、notify()、notifyAll()是三个定义在Object类里的方法,可以用来控制线程的状态。

这三个方法最终调用的都是jvm级的native方法。随着jvm运行平台的不同可能有些许差异。

 

  • 如果对象调用了wait方法就会使持有该对象的线程把该对象的控制权交出去,然后处于等待状态。
  • 如果对象调用了notify方法就会通知某个正在等待这个对象的控制权的线程可以继续运行。
  • 如果对象调用了notifyAll方法就会通知所有等待这个对象控制权的线程继续运行。
线程必须有欲调用的实例的锁定,才能执行notify方法,这点跟wait方法一样,也是规则。

被notify唤醒的线程并不是在notify的一瞬间重新开始执行。因为在notify的那一刻,执行notify的线程还握着锁定不放,所以其他线程无法获取该实例的锁定。

notify方法跟notifyAll方法两者操作的差异。notify只唤醒1个线程,而notifyAll则唤醒所有线程。两者的差别就只有这里。


插播:wait同步的对象是线程的话,那么同步锁对象(线程)执行完毕后会调用notifyAll.


JAVA中生产者消费者模式三种实现方式:

实现方式1:

import java.util.LinkedList;
import java.util.Random;

public class ProductConsumer1 {
private LinkedList<Object> lst = new LinkedList<Object>();
private int full = 10;
class Consumer extends Thread{

public void run(){
System.out.println("Consumer start");
while(true){
synchronized(lst){
while(lst.size()>=full){
try {
lst.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object o = new String("Int:"+new Random().nextInt(100));
lst.add(o);
lst.notify();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Product extends Thread{
public void run(){
System.out.println("Product start");
while(true){
synchronized(lst){
while(lst.isEmpty()){
try{
lst.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
Object o = lst.removeLast();
System.out.println("Consumer:"+o.toString());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void start(){
new Product().start();
new Consumer().start();
}
/**
* @param args
*/
public static void main(String[] args) {
ProductConsumer1 pc = new ProductConsumer1();
pc.start();
}

}

实现方式2:使用ReentrantLock和Condition(注意Condition的是await和signal成对)

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ProductConsumer2 {
private LinkedList<Object> lst = new LinkedList<Object>();
private int MAX = 10;
private final ReentrantLock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();

public ProductConsumer2(){

}

public void start(){
new Product().start();
new Consumer().start();
}

public static void main(String[] args) {
ProductConsumer2 pc = new ProductConsumer2();
pc.start();
}

class Product extends Thread{

public void run(){
System.out.println("Consumer start");
while(true){
lock.lock();
try {
while(lst.size()>=MAX){
System.out.println("Pool is full");
full.await();
}

Object o = new String("Int:"+new Random().nextInt(100));
lst.add(o);
System.out.println("Product:"+o.toString());
empty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}

class Consumer extends Thread{
public void run(){
System.out.println("Product start");
while(true){
lock.lock();
try {
while(lst.isEmpty()){
System.out.println("Pool is empty");
empty.await();
}
System.out.println("lst.size()"+lst.size());
Object o = lst.removeLast();
System.out.println("Consumer:"+o.toString());
full.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
}


}

3、使用LinkedBlockingQueue,注意使用的是此queue的put和take方法,不是使用add和remove方法.

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;

public class ProductConsumer3 {

private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);
class Product extends Thread{
public void run(){
while(true){
Object o = new String("Int:"+new Random().nextInt(100));
System.out.println("Product:"+o.toString());
try {
queue.put(o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Consumer extends Thread{
public void run(){
while(true){
Object o=null;
try {
o = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer:"+o);
}
}
}

public void start(){
new Product().start();
new Consumer().start();
}
/**
* @param args
*/
public static void main(String[] args) {
ProductConsumer3 pc = new ProductConsumer3();
pc.start();
}

}


但上述的设计生产者和设计者的耦合性比较强。在多线程模式中,可以再Producer参与者和Comsumer参与者之间增加中间者——Channel参与者。Channel参与者提供线程安全的put和take方法。这时Producer就不要考虑对池里对象访问的线程安全设计,只需要调用Channel方法就行。


下面有些包装方法,可以用来让集合架构类可以线程安全地运行:

synchronizedList/synchronizedCollection/synchronizedMap/synchronizedSet/synchronizedSortedMap/synchronizedSortedSet方法


synchronized方法和synchronized块,无论碰到return或是异常,都会确实解除锁定。


要在线程间共享long或double的字段时,必须在synchronized中操作,或是声明成volatile。


java.lang.String是用来表示字符串的类。String类并没有提供任何改变字符串属性的方法。也就是说,String实例所代表的字符串属性,绝对不会改变。

因此,Strig类的方法,都不需要synchronized


Java的标准类链接库里,就具有成对的mutable类与immutable类,例如java.lang.StringBuffer类与java.lang.String类。


插播:要将值指定给final变量字段,有两种方法:第一种是在声明字段时就直接赋初始值;另外一种则是在构造器中将值赋给字段。


任何线程在任何时刻,都可以调用其他线程的interrupt方法。当Sleep中的线程被调用interrupt方法时,就会放弃暂停的状态,并抛出InterruptedException异常。

当线程wait时,要小心锁定的问题。线程在进入等待区时,会把锁定解除。当对wait中的线程调用interrupt时,会先重新获取锁定,再抛出InterruptedException。获取锁定之前,并无法抛出InterruptedException异常。


interrupt这个方法只会改变线程的中断状态。线程A在执行sleep、wait、join时,线程B调用了线程A的interrupt方法,的确这个时候线程A里会有interruptedException异常跑出来。但这其实是在sleep、wait、join这些方法内部会不断检查中断状态的值,而自己抛出的InterruptedException。但是如果A进行在进行计算语句或者for语句、while语句、if语句、调用方法,这些语句就不会去检查中断状态,所以线程A只有这些语句时不会抛出异常,而会一直继续着自己的操作。当线程A终于执行到sleep/wait/join方法时,才会抛出interruptedException。


Thread类的实例方法isInterrupted方法可用来检查指定线程的中断状态。Thread.interrupted方法会检查现在线程的中断状态,并清除之。


自己来编写读写锁ReadWriteLock类,为了保护安全性,我们必须防止下面两种冲突:

1、读取和写入的冲突;

2、写入和写入的冲突;

读取与读取之间不会冲突。

import java.util.Arrays;
import java.util.Random;


class Data{

private char []buffer;

ReadWriteLock lock = new ReadWriteLock();
public Data(int size){
buffer = new char[size];
for(int i=0;i<size;i++){
buffer[i] = '*';
}
}
public void read() throws InterruptedException{
while(true){
try {
lock.readLock();
doRead();
slowly();
}
finally{
lock.readUnLock();
}
}


}

public void write()throws InterruptedException{
while(true){
try{
lock.writeLock();
doWrite();
}finally{
lock.writeUnLock();
}
}

}

public void doRead(){
System.out.println(Thread.currentThread().getName()+"doRead():"+Arrays.toString(buffer));
}

public void doWrite(){
System.out.println(Thread.currentThread().getName()+"doWirte()");
Random r = new Random();
for(int i=0;i<buffer.length;i++){
buffer[i]= (char)('a'+r.nextInt(26));
slowly();
}
}

private void slowly(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Writer extends Thread{
private Data data;
public Writer(String name,Data data){
super(name);
this.data = data;
}

public void run(){
System.out.println(Thread.currentThread().getName()+":start");
try {
data.write();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Reader extends Thread{
private Data data;
public Reader(String name,Data data){
super(name);
this.data = data;
}

public void run(){
System.out.println(Thread.currentThread().getName()+":start");
try {
data.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class ReadWriteLock{

private int writeNums = 0; //写线程数目
private int readNums = 0; //读线程数
private boolean preferWrite = true; //写优先
private int waitWrites = 0; //等待写的线程

public synchronized void readLock() throws InterruptedException{
while(writeNums > 0 ||(preferWrite && waitWrites>0)){
wait();
}
readNums++;
}

public synchronized void readUnLock() throws InterruptedException{
readNums--;
preferWrite = true;
notifyAll();
}

public synchronized void writeLock() throws InterruptedException{

waitWrites++;
try{
while(readNums > 0 || writeNums >0){
wait();
}
}finally{
waitWrites--;
}
writeNums++;
}

public synchronized void writeUnLock() throws InterruptedException{
writeNums--;
preferWrite = false;
notifyAll();
}
}

public class ReadWriteLockTest {

/**
* @param args
*/
public static void main(String[] args) {

Data data = new Data(10);

new Reader("Reader1",data).start();
new Reader("Reader2",data).start();
new Reader("Reader3",data).start();
new Reader("Reader4",data).start();
new Writer("Writer1",data).start();
new Writer("Writer2",data).start();

}

}

JAVA中提供的读写所ReentrantReadWriteLock的使用:


写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException

   (a).重入方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock想要获得WriteLock则永远都不要想。
   (b).WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能.


package com.zq.readwritelock;

import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class WriteReadLockSupport{
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Lock rLock = lock.readLock();
private Lock wLock = lock.writeLock();
private LinkedList<Object> lst = new LinkedList<Object>();

public boolean find(Object o){
rLock.lock();
try{
return lst.contains(o);
}finally{
rLock.unlock();
}
}

public int size(){
rLock.lock();
try{
System.out.println("size():"+lst.size());
return lst.size();
}finally{
rLock.unlock();
}
}

public void put(Object o){
wLock.lock();
try{
lst.add(o);
slowly();
}finally{
wLock.unlock();
}
}

public void putAndReturnSize(Object o){
wLock.lock();
lst.add(o);
rLock.lock();
wLock.unlock();
System.out.println("After put "+o.toString()+",Size is"+lst.size());
rLock.unlock();
slowly();
}

public void slowly(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
public class ReentrantReadWriteLockTest {

static final WriteReadLockSupport su = new WriteReadLockSupport();

/**
* @param args
*/
public static void main(String[] args) {
Thread t1 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.putAndReturnSize("rwt1:"+i);
}
}
};

Thread t2 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.putAndReturnSize("rwt2:"+i);
}
}
};

Thread t3 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.put("wt1:"+i);
}
}
};

Thread t4 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.put("wt2:"+i);
}
}
};

Thread t5 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.find("wt1:"+i);
}
}
};

Thread t6 = new Thread(){
public void run(){
for(int i=0;i<10;i++){
su.size();
}
}
};

t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
t6.start();

}

}

如果你有许多需要长时间运行的任务同时执行,并需要等所有的这些线程都执行完毕,还想得到一个返回值,那么这就有点小小难度了。但 Java 已经有解决方案给你,那就是 Executors ,一个简单的类可以让你创建线程池和线程工厂。

以下是Java自带的几种线程池:
1、newFixedThreadPool  创建一个指定工作线程数量的线程池。
2、newCachedThreadPool 创建一个可缓存的线程池。
3、newSingleThreadExecutor 创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行。
4、newScheduleThreadPool  创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。

一旦你创建了一个线程池,你就可以往池中通过不同的方法提交执行任务,可提交 Runnable 或者 Callable 到线程池中,该方法返回一个 Future 实例表示任务的状态,如果你提交一个 Runnable ,那么如果任务完成后 Future 对象返回 null。


package com.zq.threadpool;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


final class ResultThread implements Callable<String>{
@Override
public String call() throws Exception {
return Thread.currentThread().getName()+":"+Math.random();
}

}

class RunnableThread implements Runnable{

@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":Runnable:"+Math.random());
}

}

public class CompletionServiceTest {

/**
* @param args
*/
public static void main(String[] args) {
final ExecutorService threadPool = Executors.newFixedThreadPool(4);
final CompletionService<String> pool = new ExecutorCompletionService<String>(threadPool);
for(int i=0;i<10;i++){
pool.submit(new ResultThread());
threadPool.submit(new RunnableThread());
}

try {
for(int i=0;i<10;i++){
String result;

result = pool.take().get();

System.out.println(result);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally{
threadPool.shutdown();
}

}

}



CompletionService 就是一个服务,用以简化等待任务的执行结果。当然Future也可以用来等待线程返回结果,但是不建议这里使用。

List<Future<String>> futures = new ArrayList<Future<String>>(10);
for(int i = 0; i < 10; i++){
   futures.add(pool.submit(new ResultThread()));
}
for(Future<String> future : futures){
   String result = future.get();
 
   //Compute the result
}
pool.shutdown();