Java 并发编程之线程池的使用 (二)

时间:2022-12-04 18:03:43

设置线程池的大小

如果线程池过大,那么可能会耗尽资源 ,如果过小,那么 将导致许多空闲的处理器无法工作,从而降低吞吐率。


要设置正确的线程池大小,需要分析计算环境,资源预算和任务的特性,cpu数量,内存大小,任务是计算密集型还是I/O密集型,还是二者皆可。它们是否需要像JDBC连接这样的稀缺资源,下面给出一个计算公式

N(threads)=N(cpu)*U(cpu)*(1+w/c);

N(threads)是最后得到的结果大小 。

N(cpu)是cpu数量,我的电脑是双核四线程,cpu的数量会是4,可以通过        System.out.println(Runtime.getRuntime().availableProcessors());来得到cpu数量

U(cpu)是目标cpu使用率,取决于程序员的期望,一般在50%左右。这个值限制在0到1之间

w/c是wait time/compute time的比率。

配置ThreadPoolExecutor

ThreadPoolExecutor为一些Executor提供了基本的实现,比如,newCachedThreadPool,newFixedThreadPool等等。ThreadPoolExecutor允许各种定制

它的构造函数如下

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
我们最常用的newCachedThreadPool。

  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
线程最小数量是0,最大大小是整数最大值,相当于无限,阻塞限制时间是一分钟,如果一个线程闲置时间超过1分钟,那么会被回收。

管理队列任务

根据线程池的大小来选择合适的队列有利于充分利用资源和防止耗尽资源。

基本的排队方法有3种:

  • *队列
  • 有界队列
  • 同步移交

newFixedThreadPool和newSingleThreadPool在默认情况下将使用一个*的LinkedBlockingQueue。当所有线程都在忙碌状态时,这个队列将无限制的增加。

一种更稳妥的资源管理策略是使用有界队列:如ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue。

Queue<String> que1 = new ArrayBlockingQueue<String>(200);
		Queue<String> que2 = new LinkedBlockingDeque<String>(200);
		Queue<String> que3 = new PriorityBlockingQueue<String>(200);

对于非常大的或者*的线程池,可以通过使用SynchronousQueue来避免任务排队。以及直接将任务从生产者移交到工作者中。它并不是一个真正的队列 ,而一种在线程之交进行移交的机制 。要将一个元素放入SynchronousQueue中,必须有一个线程在等待接受这个元素,如果没有线程在等待,并且线程池当前大小为最大值的时候 ,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将会被拒绝。

测试代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadTest {
	ExecutorService exec = new ThreadPoolExecutor(0, 2, 60L, TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>());

	private void putrunnable() {
		for (int i = 0; i < 4; i++) {
			exec.submit(new Runnable() {

				@Override
				public void run() {
					// TODO Auto-generated method stub
					while (true) {
						System.out.println(Thread.currentThread().getName());
						try {
							Thread.sleep(500);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
			});
		}

	}

	public static void main(String[] args) {
		new ThreadTest().putrunnable();
	}
}

输出结果如下:

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3d4eac69 rejected from java.util.concurrent.ThreadPoolExecutor@42a57993[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at java.util.concurrent.AbstractExecutorService.submit(Unknown Source)
    at ThreadDeadlock.putrunnable(ThreadDeadlock.java:12)
    at ThreadDeadlock.main(ThreadDeadlock.java:33)
pool-1-thread-1
pool-1-thread-2


对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。这种差异是因为使用了SynchronousQueue而不是LinkedBlockingQueu,在Java6中便一个新的非阻塞算法来替代Java5的算法,该算法使用它们吞吐量提高了3倍。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池。比如接受网络客户请求的服务器应用程序。

只有当任务相互独立时,为线程池或工作队列设置界限才是合理 的,如果任务之间存在依赖性,那么应该使用*的newCacheThreadPool。否则很可能发生死锁,即正在执行的任务等待尚未开始执行任务的执行结果。

饱和策略

我们在ThreadPoolExecutor的构造函数中看到了最后一个参数。  RejectedExecutionHandler handler。这个就是饱和策略。

JDK提供了几种不同的实现:

  • DiscardOldestPolicy
  • AbortPolicy
  • CallerRunsPolicy
  • discardPolicy

AbortPolicy是默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。

当新提交的任务无法保存到队列中等待执行时,DiscardPolicy会稍稍的抛弃该任务,DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。

CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了Executor的线程中执行该任务,

比如我们将上面的那个栗子代码加上调用者运行的饱和策略

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;

public class ThreadDeadlock {
	ExecutorService exec = new ThreadPoolExecutor(0, 2, 60L, TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>(), new CallerRunsPolicy());

	private void putrunnable() {
		for (int i = 0; i < 4; i++) {
			exec.submit(new Runnable() {

				@Override
				public void run() {
					// TODO Auto-generated method stub
					while (true) {
						System.out.println(Thread.currentThread().getName());
						try {
							Thread.sleep(500);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
			});
		}

	}

	public static void main(String[] args) {
		new ThreadDeadlock().putrunnable();
	}
}

之后的输出结果会变为

pool-1-thread-1
main
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
main
pool-1-thread-1
pool-1-thread-2
main
pool-1-thread-2
main
pool-1-thread-1

这样做的目的是因为,让任务在提交任务中运行。这样因为它运行本身需要的时间会降低任务提交的速率,为工作者争取时间 。应用在Socket编程中,主线程停止accept,到达的请求会被保存在TCP层中,当TCP队列填满时,会同样开始抛弃请求,这样会导致阻塞一直传递到客户端,实现高负载情况下的一种平缓的性能降低 。


当工作队列被填满后,没有预定义的饱和策略来阻塞Execute(除了抛弃就是中止还有去让调用者去执行),但这不并不能阻止任务被提交。通过Semaphore可以解决这个问题。

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

public class BounedExecutor {
	private final Executor exec;
	private final Semaphore semaphore;

	public BounedExecutor(Executor exec, int bound) {
		super();
		this.exec = exec;
		this.semaphore = new Semaphore(bound);
	}

	public void submitTask(final Runnable command) throws InterruptedException {
		semaphore.acquire();
		try {
			exec.execute(new Runnable() {

				@Override
				public void run() {
					// TODO Auto-generated method stub
					try {
						System.out.println("command.submit;");
						command.run();
					} finally {
						semaphore.release();
					}

				}
			});
		} catch (RejectedExecutionException e) {
			semaphore.release();
		}

	}

	public static void main(String[] args) {
		BounedExecutor be = new BounedExecutor(Executors.newFixedThreadPool(3),
				3);
		for (int i = 0; i < 6; i++) {
			submittask(be);
		}

	}

	private static void submittask(BounedExecutor be) {
		// TODO Auto-generated method stub
		try {
			be.submitTask(new Runnable() {

				@Override
				public void run() {
					// TODO Auto-generated method stub
					while (true) {
						System.out.println(Thread.currentThread().getName());
						try {
							Thread.sleep(500);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
			});
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

信号量的上界应设置为线程池的大小加上可排队的数量。这是因为信号量需要下在执行的和等待执行的任务数量。


结果如下:

command.submit;
command.submit;
pool-1-thread-1
pool-1-thread-2
command.submit;
pool-1-thread-3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-2

..

可以看出,在提交了线程池大小的数量之后就再没提交过task了。