java编写属于自己的线程池

时间:2022-09-05 00:27:27

什么是线程池

线程池就是以一个或多个线程[循环执行]多个应用逻辑的线程集合.

一般而言,线程池有以下几个部分:

完成主要任务的一个或多个线程.
用于调度管理的管理线程.
要求执行的任务队列.

线程池的作用:

线程池作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

自己实现线程池

根据如上对线程池的理解,我们自己编写一个属于自己的简单线程池:

简单的线程池接口:

java" id="highlighter_724444">
?
1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadPool<Job extends Runnable>{
  //执行一个任务(Job),这个Job必须实现Runnable
  void execute(Job job);
 //关闭线程池
 void shutdown();
 //增加工作者线程,即用来执行任务的线程
 void addWorkers(int num);
 //减少工作者线程
 void removeWorker(int num);
 //获取正在等待执行的任务数量
 void getJobSize();
}

客户端可以通过execute(Job)方法将Job提交入线程池来执行,客户端完全不用等待Job的执行完成。除了execute(Job)方法以外,线程池接口提供了增加/减少工作者线程以及关闭线程池的方法。每个客户端提交的Job都会进入到一个工作队列中等待工作者线程的处理。

线程池接口的默认实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{
 
  // 线程池维护工作者线程的最大数量
  private static final int MAX_WORKER_NUMBERS = 10;
  // 线程池维护工作者线程的默认值
  private static final int DEFAULT_WORKER_NUMBERS = 5;
  // 线程池维护工作者线程的最小数量
  private static final int MIN_WORKER_NUMBERS = 1;
  // 维护一个工作列表,里面加入客户端发起的工作
  private final LinkedList<Job> jobs = new LinkedList<Job>();
  // 工作者线程的列表
  private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
  // 工作者线程的数量
  private int workerNum;
  // 每个工作者线程编号生成
  private AtomicLong threadNum = new AtomicLong();
 
 //生成线程池
public DefaultThreadPool() {
    this.workerNum = DEFAULT_WORKER_NUMBERS;
    initializeWorkers(this.workerNum);
  }
 
  public DefaultThreadPool(int num) {
    if (num > MAX_WORKER_NUMBERS) {
      this.workerNum =DEFAULT_WORKER_NUMBERS;
    } else {
      this.workerNum = num;
    }
    initializeWorkers(this.workerNum);
  }
//初始化每个工作者线程
private void initializeWorkers(int num) {
    for (int i = 0; i < num; i++) {
      Worker worker = new Worker();
      //添加到工作者线程的列表
      workers.add(worker);
      //启动工作者线程
      Thread thread = new Thread(worker);
      thread.start();
    }
  }
 
public void execute(Job job) {
    if (job != null) {
    //根据线程的"等待/通知机制"这里必须对jobs加锁
      synchronized (jobs) {
        jobs.addLast(job);
        jobs.notify();
      }
    }
  }
  //关闭线程池即关闭每个工作者线程
   public void shutdown() {
    for (Worker w : workers) {
      w.shutdown();
    }
  }
   //增加工作者线程
    public void addWorkers(int num) {
    //加锁,防止该线程还么增加完成而下个线程继续增加导致工作者线程超过最大值
    synchronized (jobs) {
      if (num + this.workerNum > MAX_WORKER_NUMBERS) {
        num = MAX_WORKER_NUMBERS - this.workerNum;
      }
      initializeWorkers(num);
      this.workerNum += num;
    }
  }
  //减少工作者线程
public void removeWorker(int num) {
    synchronized (jobs) {
    if(num>=this.workerNum){
         throw new IllegalArgumentException("超过了已有的线程数量");
         }
      for (int i = 0; i < num; i++) {
        Worker worker = workers.get(i);
        if (worker != null) {
        //关闭该线程并从列表中移除
          worker.shutdown();
          workers.remove(i);
        }
      }
      this.workerNum -= num;
    }
  }
 
public int getJobSize() {
    // TODO Auto-generated method stub
    return workers.size();
  }
//定义工作者线程
class Worker implements Runnable {
    // 表示是否运行该worker
    private volatile boolean running = true;
 
    public void run() {
      while (running) {
        Job job = null;
        //线程的等待/通知机制
        synchronized (jobs) {
          if (jobs.isEmpty()) {
            try {
              jobs.wait();//线程等待唤醒
            } catch (InterruptedException e) {
              //感知到外部对该线程的中断操作,返回
              Thread.currentThread().interrupt();
              return;
            }
          }
          // 取出一个job
          job = jobs.removeFirst();
        }
        //执行job
        if (job != null) {
          job.run();
        }
      }
    }
 
    // 终止该线程
    public void shutdown() {
      running = false;
    }
  }
}



从线程池的实现中可以看出,当客户端调用execute(Job)方法时,会不断地向任务列表jobs中添加Job,而每个工作者线程会不读的从jobs上获取Job来执行,当jobs为空时,工作者线程进入WAITING状态。

当添加一个Job后,对工作队列jobs调用其notify()方法来唤醒一个工作者线程。此处我们不调用notifyAll(),避免将等待队列中的线程全部移动到阻塞队列中而造成资源浪费。

线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程。客户端线程把任务放入工作队列后便返回,而工作者线程则不端的从工作队列中取出工作并执行。当工作队列为空时,工作者线程进入WAITING状态,当有客户端发送任务过来后会通过任意一个工作者线程,随着大量任务的提交,更多的工作者线程被唤醒。

参考: 《java并发编程的艺术》 方腾飞

原文链接:https://blog.csdn.net/canot/article/details/50904001