Java并发编程知识总结

时间:2024-08-23 23:34:50

一、线程

1、线程创建

  • 继承Thread类创建线程类
  • 实现Runnable接口创建线程类
  • 使用Callable和Future创建线程

Runnable是执行工作的独立任务,但是它不返回任何值,如果希望任务完成时能够返回一个值,可以实现Callable接口

  class TestThread implements Callable<Integer> {

        @Override
public Integer call() throws Exception {
return 1;
}
}   //测试方法
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool(); //submit()的方法声明如下: <T> Future<T> submit(Callable<T> task);
Future<Integer> result = exec.submit(new TestThread()); exec.shutdown();
}

2、设置线程优先级

Thread.currentThread().setPriority(int)

Thread类中定义的优先级常量:Thread.MAX_PRIORITY,Thread.MIN_PRIORITY , Thread.NORM_PRIORITY

3、控制线程

Thread.sleep();
使当前线程(即调用该方法的线程)暂停执行一段时间,让其他线程有机会继续执行,但它并不释放对象锁。也就是说如果有synchronized同步快,其他线程仍然不能访问共享数据

Thread.yield();
该方法与sleep()类似,只是不能由用户指定暂停多长时间,并且yield()方法只能让同优先级的线程有执行的机会。

Thread.join();
如果某个线程调用另一个线程(t)的join方法,此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive() 返回false)。

wait()和notify()、notifyAll()

这三个方法用于协调多个线程对共享数据的存取,所以必须在synchronized语句块内使用。synchronized关键字用于保护共享数据,阻止其他线程对共享数据的存取,但是这样程序的流程就很不灵活了,如何才能在当前线程还没退出synchronized数据块时让其他线程也有机会访问共享数据呢?此时就用这三个方法来灵活控制。

wait()方法使当前线程暂停执行并释放对象锁标示,让其他线程可以进入synchronized数据块,当前线程被放入对象等待池中。当调用notify()方法后,将从对象的等待池中移走一个任意的线程并放到锁标志等待池中,只有锁标志等待池中线程能够获取锁标志;如果锁标志等待池中没有线程,则notify()不起作用。

notifyAll()则从对象等待池中移走所有等待那个对象的线程并放到锁标志等待池中。

注意:wait()和notify(),notifyAll()是Object类的方法,sleep()和yield()是Thread类的方法。

后台线程

new Thread().setDaemon(boolean); //必须在线程启动之前调用才能线程设置成后台线程
当最后一个非后台线程终止时,后台线程会“突然”终止。因此一旦main()推出,JVM就会立即关闭所有后台线程

4、线程状态
1、新建(New):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必需的系统资源,并执行了初始化。此刻线程已经有资格获得CPU时间。之后调度器将把线程转变为可运行状态或阻塞状态。
2、就绪(Runnable):在这种状态下,只要调度器把时间片分配个线程,线程就可以运行。
3、阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入就绪状态,它才有可能执行操作。
4、死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已结束。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以被中断。

5、检查中断:Thread.interrupted();

6、未处理的异常

在线程中,一旦异常逃出任务的run方法,它就会向外传播到控制台。要捕获这种错误的异常,可以给每个Thread对象设置异常处理器。
Thread#setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)

Thread t = new Thread();

t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

  @Override

  public void uncaughtException(Thread t, Throwable e) {

 System.out.println(e.getCause());

  }

});
t.start();  

7、线程同步

基本上所有的并发模式在解决线程冲突的问题的时候,都是采用序列化访问共享资源的方案。
解决资源共享的方法:
synchronized关键字:当任务要执行被此关键字保护的代码段的时候,它将检查锁是否可用,然后获得锁,执行代码,释放锁。所有对象都自动包含单一的锁,当在对象上调用其任意synchronized方法的时候,此对象被加锁,这时该对象上的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。一个任务可以多次获得对象的锁,比如在调用对象上的某个synchronized方法时,该方法又调用了第二个synchronized方法,此时任务会获得多个锁。JVM负责跟踪对象被加锁的次数,当锁被完全释放,计数为0,当任务第一次给对象加锁时,计数为1,每当这个相同的任务在这个对象上获得锁时,计数会递增。synchronized static 方法可以在类的范围防治对static静态数据的并发访问。
用法:
synchronized void f(){/** … **/}
void g(){ synchronized(object){}}
synchronized static void h(){/** … **/}
注意:在使用并发时,将域设置成private非常重要,否则synchronized关键字就无法防止其他任务直接访问域,导致冲突发生。

使用Lock
Lock对象必须给显式地创建,锁定和释放
用法: 


Lock lock = new ReentrantLock();

try {

  lock.lock();

  /**

   * do something

 */

  return;

} finally {

  lock.unlock();

}

注意:lock()的调用必须放置在finally子句中带有unlock()的try-finally语句中,return语句必须放在try子句中出现,以确保unlock()不会过早发生。

原子类AtomicInteger,AtomicLong,AtomicReference... ,常规编程很少会派上用场,但是在涉及性能调优的时候它们就大有用武之地。

Condition:使用互斥并允许任务挂起的基本类,可以通过在Condition上调用await()来挂起一个任务,当外部条件发生变化时,意味着某个任务应该继续执行时,可以调用Condition的signal() 来通知这个任务,从而唤醒一个任务,或者它调用signalAll()来唤醒所有在这个Condition上被其自身挂起的任务。(与Object类的notifyAll() 相比,signalAll()时更安全的方式)

用法:

class Test {
    
  private Lock lock = new ReentrantLock();

  private Condition condition = lock.newCondition();


  public void t1() {
   
    try {

        lock.lock();

        condition.signalAll();

    } finally {

  lock.unlock();

     }

  }


  public void t2() {

     try {

        lock.lock();

        condition.await();

     } catch (InterruptedException e) {

 e.printStackTrace();

     } finally {

 lock.unlock();

     }

  }

}

8、线程本地存储(ThreadLocal<E>)

线程本地存储是一种自动化机制,可以为使用相同变量的每个不同线程创建不同的存储。比如你有5个线程都要使用变量x所表示的对象,那线程本地存储就会生成5个用于x的不同存储块。
用法:

class Test implements Runnable {


 private ThreadLocal<Integer> value = new ThreadLocal<Integer>(){

private Random rand = new Random(47);

     @Override

     protected Integer initialValue() {

return rand.nextInt(100);

    }

};


  @Override

  public void run() {

 value.get();

     value.set(2);

  }

}

9、生产者消费者队列
ArrayBlockingQueue:具有固定的尺寸,元素先进先出
LinkedBlockingQueue:*对列,元素先进先出
用法:

BlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<>(5);

BlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<>();

BlockingQueue<E>中定义的部分方法:

add(E e):向对列中添加一个元素,成功返回true,空间不足抛出IllegalStateException
offer(E e):向对列中添加一个元素,成功返回true,失败返回false
offer(E e,long timeout,TimeUnit unit):向对列中添加一个元素,成功返回true,失败返回false,空间不足时会等待给定时间后再进行插入
poll(long timeout,TimeUnit unit):返回并删除队头的元素,如果对列是空,会等待给定的时间
put(E e):向对列中添加一个元素,空间不足时会等待,直到有空间以插入新的元素
take(E e):返回并删除队头的元素,如果对列是空,会等待

任务间使用管道进行输入/输出
PipedWriter:允许任务向管道写
PipedReader:允许多个任务从同一个管道中读取
用法:

class Sender implements Runnable {


private Random random = new Random(47);

 private PipedWriter writer;


 public Sender(PipedWriter writer) {

this.writer = writer;

 }

 @Override

public void run() {

 try {

 for (char c = 'A'; c <= 'z'; c++) {

 writer.write(c);

TimeUnit.SECONDS.sleep(2);

 }

 } catch (IOException e) {

e.printStackTrace();

 } catch (InterruptedException e){

e.printStackTrace();

 }

}

} 

class Reseiver implements Runnable {

 private PipedReader reader;


public Reseiver(PipedReader reader) {

this.reader = reader;

}


@Override

public void run() {

try {

 while (true) {

 System.out.println((char)reader.read());

}

} catch (IOException e) {

e.printStackTrace();

 }

}

} //测试方法
public static void main(String[] args) throws InterruptedException, IOException {

ExecutorService exec = Executors.newCachedThreadPool();

PipedWriter writer = new PipedWriter();

Sender sender = new Sender(writer);

Reseiver reseiver = new Reseiver(new PipedReader(writer));

exec.execute(sender);

 exec.execute(reseiver);

 exec.shutdown();

}

Java SE5新类库中的构件:
CountDownLatch:用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。你可以向CountDownLatch对象设置一个初始值,任务在这个对象上调用await()的方法都将阻塞,直到这个计数值变成0。其他任务在结束其工作时,可以在该对象上调用countDown()来减少这个计数值。CountDownLatch被设计为只触发一次,计数值不能被重置。
用法:

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

CountDownLatch latch = new CountDownLatch(10);

 service.execute(() -> {

try {

System.out.println("wait begin");

latch.await();

System.out.println("wait end...");

} catch (InterruptedException e) {

e.printStackTrace();

 }

});


for (int i = 0; i < 10; i++) {

service.execute(() -> {

 latch.countDown();

 System.out.println("countdown:" + latch.getCount());

});

}


 service.shutdown();


}

CyclicBarrier:有一组任务并行地执行工作,然后在进行下一个步骤前等待,直到所有任务都完成。与CountDownLatch相似,只是CountDownLatch只触发一次,而CyclicBarrier可以多次重用。CyclicBarrier中最重要的方法是await(),当任务调用await()时,线程会被挂起直到所有其他任务都到达barrier(其他任务都调用了await())。CyclicBarrier构造函数可以接受一个barrierAction(Runnable),barrierAction会在所有任务都到达barrier后被执行。
用法:

public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

 CyclicBarrier barrier = new CyclicBarrier(10,

() -> System.out.println("all reach barrier " +

Thread.currentThread().getName()));


for (int i = 0; i < 10; i++) {

 service.execute(() -> {

try {

System.out.println("wait for other" + Thread.currentThread());

 barrier.await();

 } catch (InterruptedException e) {

 e.printStackTrace();

} catch (BrokenBarrierException e) {

e.printStackTrace();

}

 });

 }


service.shutdown();

}

DelayQueue:一个*的BlockingQueue,用于放置实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。这个队列是有序的。注意此队列不允许插入null元素。
用法:

class DelayQueueTest implements Runnable, Delayed {


 private static int count = 0;

 private int id = count++;

private int delta;

 private long trigger;


 public DelayQueueTest(int delayInMilliseconds) {

this.delta = delayInMilliseconds;

 this.trigger = System.nanoTime() +
        TimeUnit.NANOSECONDS.convert(

delayInMilliseconds,

 TimeUnit.MILLISECONDS);

}

@Override

public void run() {

System.out.println("id:"+id+" delta="+delta);

 }


@Override

public long getDelay(TimeUnit unit) {

return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);

}


 @Override

public int compareTo(Delayed o) {

DelayQueueTest that = (DelayQueueTest) o;

if (this.trigger < that.trigger) {

return -1;

 }

if (this.trigger > that.trigger) {

return 1;

}

return 0;

}

} //测试方法
public static void main(String[] args) throws InterruptedException {

 ExecutorService service = Executors.newCachedThreadPool();

 DelayQueue<DelayQueueTest> queue = new DelayQueue<>();

Random random = new Random(47);


for (int i = 0; i < 10; i++) {

 DelayQueueTest dqt = new DelayQueueTest(random.nextInt(5000));

 queue.add(dqt);

}

 service.execute(() -> {

try {

while (!Thread.interrupted()){

queue.take().run();

 }

} catch (InterruptedException e) {

e.printStackTrace();

}

});

 service.shutdown();

}

PriorityBlockingQueue:优先级队列,一个*的BlockingQueue,具有可阻塞的读取操作。优先级队列中的元素是按照优先级顺序从队列中取出。即队头是优先级最高的元素。注意此队列不允许插入null元素。
用法:

class PriorityQueueTest implements Runnable,Comparable<PriorityQueueTest>{


 private static int count = 0;

 private int id = count++;


 private int priority;


 public PriorityQueueTest(int priority) {

this.priority = priority;

}


@Override

 public void run() {

System.out.println("id:" + id + " priority=" + priority);

 }


 @Override

 public int compareTo(PriorityQueueTest o) {

 return this.priority < o.priority ? 1 : (this.priority > o.priority ? -1 : 0);

}

} //测试方法
public static void main(String[] args) throws InterruptedException {

ExecutorService service = Executors.newCachedThreadPool();

Random random = new Random(47);


 PriorityBlockingQueue<PriorityQueueTest> queueTests = new PriorityBlockingQueue<>();

 for (int i = 0; i < 10; i++) {

 queueTests.put(new PriorityQueueTest(random.nextInt(200)));

}

 service.execute(() -> {

try {

while (!Thread.interrupted()) {

queueTests.take().run();

}

} catch (InterruptedException e) {

 e.printStackTrace();

 }

});

 service.shutdown();

}

二、线程池

使用Executor

常用类及方法说明:
ExecutorService:继承自Executor接口,是一个具有服务生命周期的Executor,例如关闭
Executors:Executor的工厂类或工具方法,包含创建ExecutorService和ScheduledExecutorService等的方法,
常用静态静态方法:
newCachedThreadPool :无边界可回收线程池,当线程池中没有可用的线程时会创建新的线程,如果有可用时会复用可用线程,60秒内没有被使用的线程会被终止
newSingleThreadScheduledExecutor:只有一个线程的线程池,与newFixedThreadPool(1) 不同的地方是它不可以在后续调整线程的数目,而newFixedThreadPool则可以通过ThreadPoolExecutor#setCorePoolSize
newScheduledThreadPool:包含指定数量线程的线程池,任务可以在指定时间后开始执行,并且可以设置周期性执行
newFixedThreadPool:包含指定数量线程的线程池

用法:

ExecutorService exec = Executors.newCachedThreadPool();
ExecutorService exec1 = Executors.newSingleThreadExecutor();
ExecutorService exec2 = Executors.newFixedThreadPool(10);
ExecutorService exec3 = Executors.newScheduledThreadPool(5);

ScheduledThreadPoolExecutor:一个可以在给定时间后执行任务,或周期性执行任务的ThreadPoolExecutor。此类同样实现了ExecutorService接口。

用法:

public static void main(String[] args) throws Exception {
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);

scheduler.schedule(() -> {

System.out.println("do something once");

},1000, TimeUnit.MILLISECONDS);
 scheduler.scheduleAtFixedRate(() -> {

System.out.println("do something periodically");

},1000,1000, TimeUnit.MILLISECONDS);

scheduler.shutdown();
}